This example presents how to handle a pool of threads using : ExecutorService, Executors, Callable and ForkJoinPool, RecursiveTask.
Get the sources of the example from the following GitHub url : https://github.com/javaspringexamples/ThreadHandlePool.git
Or Download a .zip file : https://github.com/javaspringexamples/ThreadHandlePool/archive/master.zip
package javaspringexamples.ThreadExamples; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.IntStream; /** * This example represents how to create a pool of threads using ExecutorService * and Callable interfaces. * * @author mounir.sahrani@gmail.com * */ public class ListInitializerCallable implements Callable<List<Integer>> { int[] data; public ListInitializerCallable(int[] data) { this.data = data; } /** * Method call() to override for Callable objects. */ @Override public List<Integer> call() throws Exception { ArrayList<Integer> l = new ArrayList<>(); for (int i = 0; i < data.length; i++) { l.add(data[i] % 2); } return l; } /** * The main method. * * @param args * @throws InterruptedException * @throws ExecutionException */ public static void main(String[] args) throws InterruptedException, ExecutionException { int lenght = 10000; ListInitializerCallable li1 = new ListInitializerCallable(IntStream.range(0, lenght / 2).toArray()); ListInitializerCallable li2 = new ListInitializerCallable(IntStream.range((lenght / 2), lenght).toArray()); // Fixing the pool to 2 threads ExecutorService es = Executors.newFixedThreadPool(2); Future<List<Integer>> f1 = es.submit(li1); Future<List<Integer>> f2 = es.submit(li2); // Getting the list returned of the 1st thread. List<Integer> l1 = f1.get(); // Getting the list returned of the 2nd thread. List<Integer> l2 = f2.get(); // Merging the two lists. l1.addAll(l2); System.out.println(l1.size()); System.out.println(l1); // Shutting down the ExecutorService es.shutdown(); } }
package javaspringexamples.ThreadExamples; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.stream.IntStream; /** * This example represents how to create a pool of threads using ForkJoinPool * and extending the RecursiveTask class. * * @author mounir.sahrani@gmail.com * */ public class ListInitializerRecursiveTask extends RecursiveTask<List<Integer>> { private int[] data; private int begin; private int end; public ListInitializerRecursiveTask(int[] data, int _begin, int _end) { this.data = data; begin = _begin; end = _end; } /** * Method compute() to override for RecursiveTask objects, we initiate the * list recursively. */ @Override protected List<Integer> compute() { List<Integer> l = new ArrayList<>(); int middle = (end - begin) / 2; if (middle > 500) { ListInitializerRecursiveTask li1 = new ListInitializerRecursiveTask(data, begin, (begin + middle)); li1.fork(); ListInitializerRecursiveTask li2 = new ListInitializerRecursiveTask(data, (begin + middle), end); l = li2.compute(); l.addAll(li1.join()); } else { for (int i = begin; i < end; i++) { l.add(data[i] % 2); } } return l; } /** * The main method. * * @param args * @throws InterruptedException * @throws ExecutionException */ public static void main(String[] args) throws InterruptedException, ExecutionException { int lenght = 10000; ListInitializerRecursiveTask li1 = new ListInitializerRecursiveTask(IntStream.range(0, lenght).toArray(), 0, lenght); // Initiating the ForkJoinPool ForkJoinPool fjp = new ForkJoinPool(); // Getting the list initiated recursively List<Integer> l = fjp.invoke(li1); System.out.println(l.size()); System.out.println(l); } }