This example presents how to handle a pool of threads using : ExecutorService, Executors, Callable and ForkJoinPool, RecursiveTask.
Get the sources of the example from the following GitHub url : https://github.com/javaspringexamples/ThreadHandlePool.git
Or Download a .zip file : https://github.com/javaspringexamples/ThreadHandlePool/archive/master.zip
package javaspringexamples.ThreadExamples;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
/**
* This example represents how to create a pool of threads using ExecutorService
* and Callable interfaces.
*
* @author mounir.sahrani@gmail.com
*
*/
public class ListInitializerCallable implements Callable<List<Integer>> {
int[] data;
public ListInitializerCallable(int[] data) {
this.data = data;
}
/**
* Method call() to override for Callable objects.
*/
@Override
public List<Integer> call() throws Exception {
ArrayList<Integer> l = new ArrayList<>();
for (int i = 0; i < data.length; i++) {
l.add(data[i] % 2);
}
return l;
}
/**
* The main method.
*
* @param args
* @throws InterruptedException
* @throws ExecutionException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
int lenght = 10000;
ListInitializerCallable li1 = new ListInitializerCallable(IntStream.range(0, lenght / 2).toArray());
ListInitializerCallable li2 = new ListInitializerCallable(IntStream.range((lenght / 2), lenght).toArray());
// Fixing the pool to 2 threads
ExecutorService es = Executors.newFixedThreadPool(2);
Future<List<Integer>> f1 = es.submit(li1);
Future<List<Integer>> f2 = es.submit(li2);
// Getting the list returned of the 1st thread.
List<Integer> l1 = f1.get();
// Getting the list returned of the 2nd thread.
List<Integer> l2 = f2.get();
// Merging the two lists.
l1.addAll(l2);
System.out.println(l1.size());
System.out.println(l1);
// Shutting down the ExecutorService
es.shutdown();
}
}
package javaspringexamples.ThreadExamples;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;
/**
* This example represents how to create a pool of threads using ForkJoinPool
* and extending the RecursiveTask class.
*
* @author mounir.sahrani@gmail.com
*
*/
public class ListInitializerRecursiveTask extends RecursiveTask<List<Integer>> {
private int[] data;
private int begin;
private int end;
public ListInitializerRecursiveTask(int[] data, int _begin, int _end) {
this.data = data;
begin = _begin;
end = _end;
}
/**
* Method compute() to override for RecursiveTask objects, we initiate the
* list recursively.
*/
@Override
protected List<Integer> compute() {
List<Integer> l = new ArrayList<>();
int middle = (end - begin) / 2;
if (middle > 500) {
ListInitializerRecursiveTask li1 = new ListInitializerRecursiveTask(data, begin, (begin + middle));
li1.fork();
ListInitializerRecursiveTask li2 = new ListInitializerRecursiveTask(data, (begin + middle), end);
l = li2.compute();
l.addAll(li1.join());
} else {
for (int i = begin; i < end; i++) {
l.add(data[i] % 2);
}
}
return l;
}
/**
* The main method.
*
* @param args
* @throws InterruptedException
* @throws ExecutionException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
int lenght = 10000;
ListInitializerRecursiveTask li1 = new ListInitializerRecursiveTask(IntStream.range(0, lenght).toArray(), 0,
lenght);
// Initiating the ForkJoinPool
ForkJoinPool fjp = new ForkJoinPool();
// Getting the list initiated recursively
List<Integer> l = fjp.invoke(li1);
System.out.println(l.size());
System.out.println(l);
}
}
