This tutorial will discuss components of java.util.concurrent package like Java Semaphore, Executor Framework, ExecutorService to implement Concurrency in Java:
From our previous Java tutorials, we know that the Java platform supports concurrent programming from the ground up. The basic unit of concurrency is a thread and we have discussed threads and multithreading in Java in detail.
From Java 5 onwards, a package called ‘java.util.concurrent’ was added to the Java platform. This package contains a set of classes and libraries that makes it easier for the programmer to develop concurrent (multi-threaded) applications. Using this package, we don’t have to write complex classes as we have ready implementations of most of the concurrent concepts.
=> Check ALL Java Tutorials Here.
In this tutorial, we will discuss the various components of the java.util.concurrent package concerning concurrency and multithreading in Java.
Table of Contents:
java.util.concurrent Package
Enlisted below are the various components of java.util.concurrent package concerning concurrency and multithreading in Java. Let’s explore each component in detail with the help of simple programming examples. Some of the components we will
discuss are:
- Executor framework
- ExecutorService
- ThreadPool
- Callable
- Locks- ReentrantLock
- Semaphore
- ForkJoinPool
Executor Framework In Java
The Executor Framework in Java was released with the JDK 5 release. The Executor Framework (java.util.concurrent.Executor) is a framework that consists of components that help us to efficiently handle multiple threads.
Using the Executor Framework, we can run objects that are Runnable by reusing the already existing threads. We need not create new threads every time when we need to run objects.
The Executor API separates or de-couples the execution of a task from the actual task using an Executor. An executor is centered on the Executor interface and has sub-interfaces i.e. ExecutorService and the class ThreadPoolExecutor.
Thus using Executor, we just have to create Runnable objects and then send them to the executor which executes them.
Some of the best practices to be followed while using the Executor framework are,
- We should cross-check and plan a code to review the top lists so that we can detect deadlock as well as livelock in the code.
- Java code should always be executed against static analysis tools. Examples of static analysis tools are FindBugs and PMD.
- We should not only catch exceptions but also the errors in multi-threaded programs.
Now let’s discuss the components of Executor Framework in Java.
Executor
The executor can be defined as an interface used to represent an object that executes the tasks provided to it. Whether the task is to be run on current or new thread depends on the point from where the invocation was initiated which further depends on the implementation.
So using Executor, we can de-couple the tasks from the actual task and then run them asynchronously.
However, the execution of the task using Executor need not be asynchronous. Executors can also invoke the task instantly using invoking thread.
Given below is an example piece of code to create Executor instance:
public class Invoker implements Executor { @Override public void execute (Runnable r_interface) { r_interface.run(); } }
Once the invoker is created, as shown above, we can use it to execute the task as follows.
public void execute () { Executor executor = new Invoker (); executor.execute ( () -> { //perform task }); }
Note that if the task is not accepted by the Executor, then it throws RejectedExecutionException.
ExecutorService
An ExecutorService (java.util.concurrent.ExecutorService) schedules the submitted tasks as per the availability of threads and also maintains a memory queue. The ExecutorService acts as a complete solution for the Asynchronous processing of tasks.
To use ExecutorService in code, we create a Runnable class. The ExecutorService maintains a thread pool and also assigns the tasks to the threads. Tasks can also queue up in case the thread is not available.
Given below is a simple example of ExecutorService.
import java.util.concurrent.*; public class Main { public static void main(String[] args) { //create ExecutorService instance with 10 threads ExecutorService executor_Service = Executors.newFixedThreadPool(10); //assign the service to Runnable instance executor_Service.execute(new Runnable() { @Override public void run() { //print the message System.out.println("Simple Example of ExecutorService!!!"); } }); //shutdown executorService executor_Service.shutdown(); } }
Output
In the above program, we create a simple ExecutorService instance with a thread pool consisting of 10 threads. It is then assigned to the Runnable instance and executed to print the above message. After printing the message, the ExecutorService is shut down.
Thread Pool
A Thread pool in Java is a group of worker threads that can be reused many times and assigned jobs.
A Thread pool contains a group of fixed size threads. Each thread is pulled out from the thread pool and assigned a task by the service provider. Once the assigned job is completed, the thread is given to the thread pool again.
Thread pool is advantageous as we do not have to create a new thread every time the task is available, thereby the performance is enhanced. It is used in real-time applications that use Servlet and JSP where thread pools are used to process requests.
In multi-threaded applications, the Thread Pool saves resources and helps to contain the parallelism within predefined limits.
The below Java program demonstrates the Thread pool in Java.
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class WorkerThreadClass implements Runnable { private String message; //thread class constructor public WorkerThreadClass(String s){ this.message=s; } //run method for thread public void run() { System.out.println(" Start: "+message); processmessage(); //sleep between start and end System.out.println(" End: "+ message); } //processmessage method => sleeps the thread for 2 sec private void processmessage() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } public class Main { public static void main(String[] args) { //create a ExecutorService instance ExecutorService executor = Executors.newFixedThreadPool(5);//creating a pool of 5 threads //create thread instances and execute them for (int i = 0; i < 5; i++) { Runnable workerThrd = new WorkerThreadClass("Thread_" + i); executor.execute(workerThrd);//calling execute method of ExecutorService } //shutdown ExecutorService executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads"); } }
Output
In the above programs, there is a thread pool of 5 threads that are created using the “newFixedThreadPool” method. Then the threads are created and added to the pool and assigned to the ExecutorService for execution.
Callable In Java
We already know that we can create threads using two approaches. One approach is by extending the Thread class while the second approach is by implementing a Runnable interface.
However, threads created using the Runnable interface lack one feature i.e. it does not return a result when the thread is terminated or run () completes execution. This is where the Callable interface comes into the picture.
Using a Callable interface we define a task so that it returns a result. It may also throw an exception. The Callable interface is a part of the java.util.concurrent package.
The Callable interface provides a call () method that is on the similar lines as run () method provided by the Runnable interface with the only difference that the call () method returns a value and throws checked exception.
The call () method of Callable interface has the following prototype.
public Object call () throws Exception;
Since the call () method returns an Object, the main thread must be aware of this.
Hence the return value should be stored in another object known to the main thread. This purpose is served by using a “Future” object. A Future object is an object that holds the result returned by a thread. Or in other words, it will hold the result when Callable returns.
Callable encapsulates a task that should run on another thread. A Future object stores the result returned from a different thread.
A Callable interface cannot be used to create a thread. We need Runnable to create a thread. Then to store the result a Future object is required. Java provides a concrete type named “FutureTask” that combines the functionality by implementing both Runnable and Future.
We create a FutureTask by providing a constructor with Callable. This FutureTask object is then given to the constructor of the Thread class to create a Thread object.
Given below is a Java program that demonstrates the Callable interface and Future object. We also use the FutureTask object in this program.
As already mentioned, in the program we create a class that implements a Callable interface with an overridden call () method. In the main method, we create 10 FutureTask objects. Each object constructor has a Callable class object as its argument. Then the FutureTask object is associated with a thread instance.
Hence indirectly we create a thread using a Callable interface object.
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; //create a class implementing Callable interface class CallableDemo implements Callable { //define call () method public Object call() throws Exception { Random generator = new Random(); Integer randomNumber = generator.nextInt(10); Thread.sleep(randomNumber * 1000); return randomNumber; } } public class Main { public static void main(String[] args) throws Exception { // Array of FutureTask objects FutureTask[] randomNumberTasks = new FutureTask[10]; for (int i = 0; i < 10; i++) { Callable callable = new CallableDemo(); // Create the FutureTask with Callable class randomNumberTasks[i] = new FutureTask(callable); // create thread with FutureTask Thread t = new Thread(randomNumberTasks[i]); //start the thread t.start(); } System.out.println("The contents of FutureTask objects:"); for (int i = 0; i < 10; i++) { // get() contents of FutureTask System.out.print(randomNumberTasks[i].get() + " "); } } }
Output
As shown in the above program, the call () method of Callable which is overridden in the class implementing Callable generates random numbers. Once the thread is started, it displays these random numbers.
Also, we use FutureTask objects in the main function. As it implements the Future interface, we need not store the results in the Thread objects. Similarly, we can cancel the task, check if it’s running or complete, and also get the result using the FutureTask object.
ReentrantLock In Java
We have discussed thread synchronization using the synchronized keyword in detail in our last tutorial. The use of the synchronized word for thread synchronization is the basic method and is somewhat rigid.
Using the synchronized keyword, a thread can lock only once. Also, after one thread exits the synchronized block, the next thread takes the lock. There is no waiting queue. These issues may cause starvation of some other thread as it may not get to access the resources for a long time.
To address these issues, we need a flexible method of synchronizing the threads. The “Reentrant Locks” is this method in Java that provides synchronization with far greater flexibility.
The class “ReentrantLock” implements Reentrant locks and is a part of the package “import java.util.concurrent.locks”. ReentrantLock class provides the method synchronization to access shared resources. The classes also have the lock and unlock methods for locking/unlocking resources when accessed by threads.
One peculiar feature of ReentrantLock is that the thread can lock the shared resource more than once using ReentrantLock. It provides hold count which is set to one when the thread locks the resource.
The thread can re-enter and access the resource before unlocking. Every time the thread accesses the resource using the Reentrant lock, the hold count is incremented by one. For every unlock, the hold count is decremented by one.
When the hold count reaches 0, the shared resource is unlocked.
The ReentrantLock class also provides a fairness parameter which is a boolean value that can be passed with the constructor of the lock. When the fairness parameter is set to true, then whenever one thread releases the lock, the lock is passed to the longest waiting thread. This prevents starvation.
The Reentrant locks can be used as follows:
<access specifier> return_type method_name() { reentrantlock.lock(); try { //Do some work } catch(Exception e) { e.printStackTrace(); } finally { reentrantlock.unlock(); } }
Note that the unlock statement for ReentrantLock is always in the finally block. This guarantees that the lock is released even if an exception is thrown.
Let’s implement a Java program to understand ReentrantLock.
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; //thread class that implements Runnable interface class ThreadClass implements Runnable { String task_name; //define ReentrantLock object ReentrantLock thrd_lck; //ThreadClass constructor initialized lock and task name public ThreadClass(ReentrantLock r_lock, String t_name) { thrd_lck = r_lock; task_name = t_name; } //thread run () method public void run() { boolean bool_val = false; while (!bool_val) { //check for Outer Lock boolean tryLock_val = thrd_lck.tryLock(); // if lock is free, do the following if(tryLock_val) { try { for(int i=0;i<=6;i++) { if(i>=2) { thrd_lck.lock(); Thread thread_one = new Thread(); System.out.println("Thread Created....."); if(i==3) { thread_one.setName("Maint Thread2"); System.out.println("Thread Created....."); } } if(i==4) thrd_lck.unlock(); break; } System.out.println("ReentrantLock=>Is locked after sleep(1500) : " + thrd_lck.isLocked()); System.out.println("Work done for task : " + task_name ); bool_val = true; } catch(Exception e) { e.printStackTrace(); } } } } } public class Main { public static void main(String[] args) { //define ReentrantLock lock object and service pool ReentrantLock reentrant_lock = new ReentrantLock(); ExecutorService pool = Executors.newFixedThreadPool(2); //create thread instance and pass lock and task name Runnable worker_thread = new ThreadClass(reentrant_lock, "ThreadJob"); //execute the thread in exec pool pool.execute(worker_thread); //shut down the pool pool.shutdown(); } }
Output
In the above program, we have created a thread and used ReentrantLock for it. Using ReentrantLock the shared resource can be accessed.
Semaphore In Java
The next method of thread synchronization is by using Semaphore. Using this construct called semaphore, access to a shared resource is controlled through a counter. Signals are sent between the threads so that we can guard the critical section and also avoid missed signals.
A semaphore can be defined as a variable that is used to manage concurrent processes by synchronizing these processes. Semaphores are also used to synchronize access to the shared resource and thereby avoid a race condition. The permission given to a thread for accessing the shared resource by semaphore is also called a permit.
Depending on what functions they perform, semaphores can be divided into two types:
#1) Binary Semaphore: A binary semaphore is used to synchronize concurrent processes and implement mutual exclusion. A binary semaphore assumes only two values i.e. 0 and 1.
#2) Counting Semaphore: The counting semaphore has a value that indicates the number of processes that can enter the critical section. At any point, the value indicates the maximum number of processes that enter the critical section.
So how does a Semaphore work?
The working of a Semaphore can be summarized in the following steps:
- If semaphore count > 0, it means that the thread has a permit to access critical section, and then the count is decremented.
- Otherwise, the thread is blocked until the permit is acquired.
- When the thread is done with accessing the shared resource, the permit is released and semaphore count is incremented so that another thread can repeat the above steps and acquire the permit.
The above steps of the working of semaphores can be summarized in the below flowchart.
In Java, we need not implement our semaphore but it provides a Semaphore class that implements the semaphore functionality. The Semaphore class is a part of the java.util.concurrent package.
The Semaphore class provides the following constructors using which we can create semaphore object:
Semaphore (int num_value) Semaphore (int num_value, boolean how)
Here,
num_value => initial value of the permit count that determines the number of threads that can access the shared resource.
how => sets the order in which the threads will be granted permits (how = true). If how = false, then no such order is followed.
Now we will implement a Java program that will demonstrate the Semaphore that is used to manage the shared resource access and prevent the race condition.
import java.util.concurrent.*; //class for shared resource class SharedRes { static int count = 0; } class ThreadClass extends Thread { Semaphore sem; String threadName; public ThreadClass(Semaphore sem, String threadName) { super(threadName); this.sem = sem; this.threadName = threadName; } @Override public void run() { // Thread T1 processing if(this.getName().equals("T1")) { System.out.println("Start: " + threadName); try { System.out.println(threadName + " :waiting for a permit."); // acquire the permit sem.acquire(); System.out.println(threadName + ":Acquired permit"); // access shared resource for(int i=0; i < 5; i++) { SharedRes.count++; System.out.println(threadName + ": " + SharedRes.count); Thread.sleep(10); } } catch (InterruptedException exc) { System.out.println(exc); } // Release the permit. System.out.println(threadName + ":Released the permit"); sem.release(); } // Thread T2 processing else { System.out.println("Start: " + threadName); try { System.out.println(threadName + ":waiting for a permit."); // acquire the lock sem.acquire(); System.out.println(threadName + ":Acquired permit"); // process the shared resource for(int i=0; i < 5; i++) { SharedRes.count--; System.out.println(threadName + ": " + SharedRes.count); Thread.sleep(10); } } catch (InterruptedException exc) { System.out.println(exc); } // Release the permit. System.out.println(threadName + ":Released the permit."); sem.release(); } } } public class Main { public static void main(String args[]) throws InterruptedException { //create Semaphore=> #permits = 1 Semaphore sem = new Semaphore(1); // Create thread instances T1 & T2 //T1=> Increments the count; T2=> Decrements the count ThreadClass thread1 = new ThreadClass(sem, "T1"); ThreadClass thread2 = new ThreadClass(sem, "T2"); // start T1 & T2 thread1.start(); thread2.start(); // Wait T1 & T2 thread1.join(); thread2.join(); System.out.println("count: " + SharedRes.count); // display final count. } }
Output
This program declared a class for the shared resource. It also declares a thread class in which we have a semaphore variable that is initialized in the class constructor.
In the overridden run () method of the Thread class, processing of thread instance is done in which thread acquires the permit, accesses a shared resource, and then releases the permit.
In the main method, we declared two thread instances. Both the threads are then started and then they wait using the join method. Finally, the count is displayed i.e. 0 indicating that both the threads have finished with the shared resource.
Fork And Join In Java
The fork/join framework was first introduced in Java 7. This framework consists of tools that can speed up parallel processing. It uses all the available processor cores in the system and completes the task. The fork/join framework uses the divide and conquer approach.
The basic idea behind Fork/Join framework is that the first framework “Forks”, i.e. recursively breaks the task into smaller individual subtasks until the tasks are atomic so that they can be executed asynchronously.
After doing this, the tasks are “joined” i.e. all subtasks are joined recursively into a single task or return value.
The fork/join framework has a pool of threads known as “ForkJoinPool”. This pool manages the “ForkJoinWorkerThread” type of worker threads thereby providing effective parallel processing.
ForkJoinPool manages the worker threads and also helps us get information regarding the thread pool performance and state. The ForkJoinPool is an implementation of the “ExecutorService” we discussed above.
Unlike worker threads, the ForkJoinPool does not create a separate thread for each subtask. Each thread in the ForkJoinPool maintains its deque (double-ended queue) to store tasks.
The deque acts as thread’s workload balancing and does this with the help of a “work-stealing algorithm” that is described below.
Work Stealing Algorithm
We can define the work-stealing algorithm in simple words as “if a thread is free, ‘steal’ the work from busy threads”.
A worker thread will always get the tasks from its deque. When all the tasks in the deque are exhausted and the deque is empty, the worker thread will take a task from the tail of another deque or from the ‘global entry queue’.
This way the possibility of threads competing for tasks is minimized and also the number of times the thread has to scout for work is reduced. This is because the thread has already got the biggest chunk of available work and has finished it.
So how can we use the ForkJoinPool in a program?
The general definition of ForkJoinPool is as follows:
public class ForkJoinPool extends AbstractExecutorService
The class ForkJoinPool is a part of the “java.util.concurrent” package.
In Java 8, we create an instance of the ForkJoinPool using its static method “common-pool ()” that provides a reference to the common pool or the default thread pool.
ForkJoinPool commonPool = ForkJoinPool.commonPool ();
In Java 7, we create a ForkJoinPool instance and assign it to the field of utility class as shown below.
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
The above definition indicates that the pool has a parallelism level of 2 such that the pool will use 2 processor cores.
To access the above pool, we can give the following statement.
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
The base type for ForkJoinPool tasks is “ForkJoinTask”. We should extend one of its subclasses i.e. For void tasks, the RecursiveAction and for tasks returning a value, the RecursiveTask<V>. Both the extended classes provide an abstract method compute () in which we define the task’s logic.
Given below is an example to demonstrate the ForkJoinPool.
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; //class declaration for ForkJoinPool tasks class FJPoolTask extends RecursiveAction { private long Load = 0; public FJPoolTask(long Load) { this.Load = Load; } @Override protected void compute() { //if threshold is reached, break tasks into smaller tasks List&amp;lt;FJPoolTask&amp;gt; subtasks = new ArrayList&amp;lt;FJPoolTask&amp;gt;(); subtasks.addAll(createSubtasks()); for(RecursiveAction subtask : subtasks){ subtask.fork(); } } //create subtasks private List&amp;lt;FJPoolTask&amp;gt; createSubtasks() { List&amp;lt;FJPoolTask&amp;gt; sub_tasks =new ArrayList&amp;lt;FJPoolTask&amp;gt;(); FJPoolTask sub_task1 = new FJPoolTask(this.Load / 2); FJPoolTask sub_task2 = new FJPoolTask(this.Load / 2); FJPoolTask sub_task3 = new FJPoolTask(this.Load / 2); sub_tasks.add(sub_task1); sub_tasks.add(sub_task2); sub_tasks.add(sub_task3); return sub_tasks; } } public class Main { public static void main(final String[] arguments) throws InterruptedException { //get count of available processors int proc = Runtime.getRuntime().availableProcessors(); System.out.println("Processors available:" +proc); //declare forkJoinPool ForkJoinPool Pool = ForkJoinPool.commonPool(); System.out.println(" Active Threads (Before invoke):" +Pool.getActiveThreadCount()); //Declare ForkJoinPool task object FJPoolTask t = new FJPoolTask(400); //submit the tasks to the pool Pool.invoke(t); System.out.println(" Active Threads (after invoke):" +Pool.getActiveThreadCount()); System.out.println("Common Pool Size :" +Pool.getPoolSize()); } }
Output
In the above program, we find the number of active threads in the system before and after calling the “invoke ()” method. The invoke () method is used to submit the tasks to the pool. We also find the number of available processor cores in the system.
Frequently Asked Questions
Q #1) What is Java Util Concurrent?
Answer: The package “java.util.concurrent” is a set of classes and interfaces provided by Java to facilitate the development of concurrent (multi-threaded) applications. Using this package we can directly use the interface and classes as well as APIs without having to write our classes.
Q #2) Which of the following are concurrent implementations present in the java.util. concurrent package?
Answer: At a high level, the java.util.concurrent package contains utilities like Executors, Synchronizers, Queues, Timings, and Concurrent Collections.
Q #3) What is Future Java?
Answer: A Future object (java.util.concurrent.Future) is used to store the result returned by a thread when the Callable interface is implemented.
Q #4) What is thread-safe in Java?
Answer: A thread-safe code or class in Java is a code or class that can be shared in a multi-threaded or concurrent environment without any problems and produces expected results.
Q #5) What is the synchronized collection in Java?
Answer: A synchronized collection is a thread-safe collection. The method synchronized collection () of java.util.Collections class returns a synchronized (thread-safe) collection.
Conclusion
With this tutorial, we have completed the topic of multi-threading and concurrency in Java. We have discussed multithreading in detail in our previous tutorials. Here, we discussed the concurrency and the implementation related to concurrency and multithreading that are a part of the java.util.concurrent package.
We discussed two more synchronization methods, semaphores, and ReentrantLock. We also discussed the ForkJoinPool that is used to execute the tasks by dividing them into simpler tasks and then ultimately joining the result.
The java.util.concurrent package also supports the Executor framework and executors that help us to execute threads. We also discussed thread pool implementation that consists of reusable threads that are returned to the pool when the execution is finished.
We discussed another interface similar to Runnable which also helps us return a result from the thread and the Future object used to store the thread result obtained.
=> Watch Out The Simple Java Training Series Here.