Thread using « ExecutorService, Executors, Callable » and « ForkJoinPool, RecursiveTask »

Java example


This example presents how to handle a pool of threads using : ExecutorService, Executors, Callable and ForkJoinPool, RecursiveTask.

Get the sources of the example from the following GitHub url : https://github.com/javaspringexamples/ThreadHandlePool.git

Or Download a .zip file : https://github.com/javaspringexamples/ThreadHandlePool/archive/master.zip

package javaspringexamples.ThreadExamples;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

/**
 * This example represents how to create a pool of threads using ExecutorService
 * and Callable interfaces.
 * 
 * @author mounir.sahrani@gmail.com
 *
 */
public class ListInitializerCallable implements Callable<List<Integer>> {

	int[] data;

	public ListInitializerCallable(int[] data) {
		this.data = data;
	}

	/**
	 * Method call() to override for Callable objects.
	 */
	@Override
	public List<Integer> call() throws Exception {
		ArrayList<Integer> l = new ArrayList<>();
		for (int i = 0; i < data.length; i++) {
			l.add(data[i] % 2);
		}
		return l;
	}

	/**
	 * The main method.
	 * 
	 * @param args
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	public static void main(String[] args) throws InterruptedException, ExecutionException {

		int lenght = 10000;

		ListInitializerCallable li1 = new ListInitializerCallable(IntStream.range(0, lenght / 2).toArray());
		ListInitializerCallable li2 = new ListInitializerCallable(IntStream.range((lenght / 2), lenght).toArray());

		// Fixing the pool to 2 threads
		ExecutorService es = Executors.newFixedThreadPool(2);
		Future<List<Integer>> f1 = es.submit(li1);
		Future<List<Integer>> f2 = es.submit(li2);

		// Getting the list returned of the 1st thread.
		List<Integer> l1 = f1.get();
		// Getting the list returned of the 2nd thread.
		List<Integer> l2 = f2.get();
		// Merging the two lists.
		l1.addAll(l2);

		System.out.println(l1.size());
		System.out.println(l1);

		// Shutting down the ExecutorService
		es.shutdown();
	}

}
package javaspringexamples.ThreadExamples;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;

/**
 * This example represents how to create a pool of threads using ForkJoinPool
 * and extending the RecursiveTask class.
 * 
 * @author mounir.sahrani@gmail.com
 *
 */
public class ListInitializerRecursiveTask extends RecursiveTask<List<Integer>> {

	private int[] data;
	private int begin;
	private int end;

	public ListInitializerRecursiveTask(int[] data, int _begin, int _end) {
		this.data = data;
		begin = _begin;
		end = _end;
	}

	/**
	 * Method compute() to override for RecursiveTask objects, we initiate the
	 * list recursively.
	 */
	@Override
	protected List<Integer> compute() {
		List<Integer> l = new ArrayList<>();

		int middle = (end - begin) / 2;
		if (middle > 500) {
			ListInitializerRecursiveTask li1 = new ListInitializerRecursiveTask(data, begin, (begin + middle));
			li1.fork();
			ListInitializerRecursiveTask li2 = new ListInitializerRecursiveTask(data, (begin + middle), end);
			l = li2.compute();
			l.addAll(li1.join());
		} else {
			for (int i = begin; i < end; i++) {
				l.add(data[i] % 2);
			}
		}
		return l;
	}

	/**
	 * The main method.
	 * 
	 * @param args
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	public static void main(String[] args) throws InterruptedException, ExecutionException {

		int lenght = 10000;
		ListInitializerRecursiveTask li1 = new ListInitializerRecursiveTask(IntStream.range(0, lenght).toArray(), 0,
				lenght);

		// Initiating the ForkJoinPool
		ForkJoinPool fjp = new ForkJoinPool();
		// Getting the list initiated recursively
		List<Integer> l = fjp.invoke(li1);

		System.out.println(l.size());
		System.out.println(l);
	}
}