Pages

Saturday, 17 May 2014

Java concurrency in practice

Hello friends!

Let's do our best to explore java concurrency by coding.


1. Overview

In Java there are two main concurrent execution units : processes and threads. Process is autonomous execution environment that has its own memory space. All threads, created within the same process, share its memory space. Processes can communicate with each other through pipes or sockets on single or multiple machines. This is called Inter Process Communication(IPC)
Each process contains at least one thread. Both processes and threads provide execution environment, but creating new thread requires fewer resources than creating new process. Threads within single process share its resources and can communicate with each other.

2. Creating, pausing and interrupting threads

Java application starts with single thread called main. This main thread can create new threads. Each thread is described by java.lang.Thread class. 
Let's take shallow look at Thread's source code:
public class Thread implements Runnable {

    private char        name[];

    /* What will be run. */
    private Runnable target;

    public Thread() {
        // ...
    }

    public Thread(Runnable target) {
        // ...
    }

    public Thread(String name) {
        // ...
    }

    public Thread(Runnable target, String name) {
        // ...
    }

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
}
This shows you that Thread implements Runnable and accepts Runnable as its target variable which is used to run your thread by calling start() method. So this is possible:
FirstThread firstThread = new FirstThread();
Thread thread = new Thread(firstThread);
thread.start();
run() method of FirstThread will be performed.


There are two ways to start a thread:
  1. Instantiating new instance of class Thread and calling its start method.
  2. By submitting instance of type Runnable or Callable to ExecutorService.
Let's start with simpler first method. It can be divided in two ways:
  • Subclass class java.lang.Thread and call its start() method.
    public class FirstThread extends Thread {
    
     @Override
     public void run() {
      for (int i = 1; i < 4; i++) {
       System.out.println("Method run is running for " + i + " time");
      }
     }
    
     public static void main(String[] args) {
      FirstThread thread = new FirstThread();
      thread.start();
     }
    }
    
  • Implement java.lang.Runnable interface. Create new Thread instance with our Runnable and call start() method to execute run() method of our Runnable
    public class FirstRunnable implements Runnable {
    
     @Override
     public void run() {
      for (int i = 1; i < 4; i++) {
       System.out.println("Method run is running for " + i + " time");
      }
     }
    
     public static void main(String[] args) {
      FirstRunnable fRunnable = new FirstRunnable();
    
      Thread thread = new Thread(fRunnable);
      thread.start();
     }
    }
In both cases method run() is running when Thread's start() method is called.

To put thread into sleep there are two static methods of Thread class:

1. public static native void sleep(long millis) throws InterruptedException;
This method causes a thread to suspend execution for specified time in milliseconds. After time millis is over thread resumes execution of run().

2. public static void sleep(long millis, int nanos) throws InterruptedException;
This method does the same but sleep time can be set in accuracy of nanoseconds. Two methods throw InterruptedException when sleeping thread is interrupted !

Thread that called one of sleep methods doesn't lose any monitor. To try it out we can modify our previous example a little:

public class SleepThread implements Runnable {

 @Override
 public void run() {
  for (int i = 1; i < 4; i++) {
   try {
    Thread.sleep(i * 1000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   System.out.println("Method run is running for " + i + " time");
  }
 }

 public static void main(String[] args) {
  SleepThread sleepThread = new SleepThread();

  Thread thread = new Thread(sleepThread);
  thread.start();
 }
}

To interrupt thread, there is method like public void interrupt() .
For this method to work, thread must either call its method that throws InterruptedException or check for status of internal flag interrupt by calling method isInterrupted()
Thread's methods that throw InterruptedException are designed so that they can suspend their execution and throw new InterruptedException whenever interrupt method was invoked. To try it out we can reuse our previous demo and change main method to :

public static void main(String[] args) throws InterruptedException {
  SleepThread sleepThread = new SleepThread();

  Thread thread = new Thread(sleepThread);
  thread.start();
  Thread.currentThread().sleep(1000);
  thread.interrupt();
 }
This causes main thread to sleep for 1 second after thread started. After sleep main thread calls interrupt method on thread. This causes SleepThread to terminate sleep in run() and throw new InterruptedException. which is catched in catch block and for loop continues to next iteration. To understand this it is the best to code and run this demo in your IDE. Try to change sleep interval of main thread.
Note that interrupted thread cannot be started again. If you call start() twice then java.lang.IllegalThreadStateException is thrown.
Suppose your thread is doing very long task while another thread calls interrupt() method on your thread. To be able to stop execution your thread should call isInterrupted() method or static interrupted() to check for value of interrupt flag. Once it is true your thread can handle interrupt request.
This simple demo shows how it works:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public class InterruptedRunnable implements Runnable {

 @Override
 public void run() {
  while (!Thread.interrupted()) {
   for (int i = 0; i < 100; i++) {
    System.out.println("i=" + i);
   }
  }
  System.out.println("Thread is interrupted!!!");
 }

 public static void main(String[] args) throws InterruptedException {
  InterruptedRunnable interruptedRunnable = new InterruptedRunnable();

  Thread thread = new Thread(interruptedRunnable);
  thread.start();
  Thread.sleep(500);
  thread.interrupt();
 }
}
Line 18 causes main thread to fall asleep for 0.5 second to let InterruptedThread do its work.  Line 19 calls interrupt() method to change interrupt flag of thread to true.


3. Joining thread


Class Thread has join() method :

public final void join() throws InterruptedException { ... }
It allows one thread to pause execution until joined thread has finished. If thread1 has reference to thread2 and calls:
thread2.join(); thread1 pauses execution until thread2 is finished.
Run this demo to try it :

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class JoinThread implements Runnable {

 @Override
 public void run() {
  System.out.println("JoinThread has started!");
  try {
   Thread.sleep(3000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  System.out.println("JoinThread has finished!");
 }

 public static void main(String[] args) throws InterruptedException {
  JoinThread joinThread = new JoinThread();

  Thread thread = new Thread(joinThread);
  thread.start();
  thread.join();
  System.out.println("Main thread has finished!");
 }
}
Line 19 causes main thread to suspend its execution until JoinThread is over.
After JoinThread ends run() method, main thread resumes and executes line 20.

4. Synchronizing threads

Threads within single process share the same objects and values. This makes two kinds of errors possible:
  1. Thread interference
  2. Memory consistency errors
Thread interference means that two threads, acting on the same variables, interleave. Previously assigned value by one thread is overwritten with next assigned value by another thread. This can be explained with help of Counter class:
class Counter {
    private int c = 0;

    public int increment() {
        c++;
        return c;
    }

    public int decrement() {
        c--;
        return c;
    }
}
In this demo shared variable is c. Its state can be modified by increment and decrement methods. What happens if ThreadA calls increment and ThreadB calls decrement at one time when value of c is 0:
  1. ThreadA retrieves value of c.
  2. ThreadB retrieves value of c.
  3. ThreadA increments value of c by 1 but c still holds 0.
  4. ThreadB decrements value of c by 1 but c still holds 0.
  5. ThreadA stores value to c which is 1. c holds +1.
  6. ThreadB stores value to c which is -1. c holds -1.
As the result value of c is -1 for both threads.

Memory inconsistency means that changes made by one thread are not visible to another thread. There is no guarantee that changes done by one thread are visible to another thread unless programmer established happens-before relationship. We have already covered two ways that guarantee state consistency between two threads. 
  1. Latest changes are visible to thread that starts with start() method.
  2. Latest changes are visible to thread that called its join() method.
The third way to avoid memory inconsistency errors as well as thread interference is synchronization.
To avoid problems with Counter class we can synchronize methods that act on shared variable c :
class Counter {
    private int c = 0;

    public synchronized int increment() {
        c++;
        return c;
    }

    public synchronized int decrement() {
        c--;
        return c;
    }
}
This ensures that only one thread can execute increment or decrement at the same time. If ThreadA executes method increment, ThreadB is waiting to execute decrement until ThreadA returns from increment
To enter synchronized method thread should acquire a monitor of Counter object. If one thread holds monitor then all other threads should wait for monitor to be released in order to enter synchronized method. Thus only one synchronized method can be executed at the same time. Synchronized method or block ensures that all changes done are visible to other threads after synchronized method ends. When synchronized method ends the thread releases monitor and other threads can acquire it to be able to enter synchronized method or block. In case multiple threads wait for monitor, the thread that acquires it is unpredictable. To see this run the following demo:
class Model {

 private long counter = 0;

 public synchronized void count() {
  System.out.println(Thread.currentThread().getName() + " entered count()");
  
  for (int i = 0; i < 1000000000; i++)
   counter++;
  System.out.println(Thread.currentThread().getName() + " has counter " + counter);
 }
}

public class MonitorRunnable implements Runnable {

 private Model model = new Model();

 @Override
 public void run() {
  model.count();
 }

 public static void main(String[] args) {
  MonitorRunnable runnable = new MonitorRunnable();

  Thread t1 = new Thread(null, runnable, "Thread_1");
  Thread t2 = new Thread(null, runnable, "Thread_2");
  Thread t3 = new Thread(null, runnable, "Thread_3");
  Thread t4 = new Thread(null, runnable, "Thread_4");

  t1.start();
  t2.start();
  t3.start();
  t4.start();
 }
}
Each thread calls method count() of Model class. While one thread is executing it all other threads are waiting for object monitor in order to execute synchronized count() method. And the order in which waiting threads acquire monitor is unpredictable.
You can try to remove synchronized keyword from count() method. The result is that all four threads enter this method and run for loop simultaneously. This causes counter to have unpredictable value. In fact counter is different for each run of main method. 
Synchronizing increment and decrement methods is not single solution to prevent thead interference. Java provides classes that support atomic operations on single variable. They are in java.util.concurrent.atomic package. To make Counter thread-safe, it's possible to apply AtomicInteger class:
class Counter {
 private AtomicInteger c = new AtomicInteger();

 public int increment() {
  return c.incrementAndGet();
 }

 public int decrement() {
  return c.decrementAndGet();
 }
}
But this doesn't prevent both threads to execute increment and decrement methods simultaneously. While synchronization allows to execute atomically and separately(one at a time) whole methods or blocks of code, atomic classes - to execute only get and set on single variable.

5. Deadlock, starvation and livelock

Liveness is ability to run concurrently in timely manner. Liveness errors are deadlock, starvation and livelock.

Deadlock describes situation when two or more threads are blocked forever waiting for each other.
This example describes it:
public class Deadlock {
 static class Player {
  private String name;

  public Player(String name) {
   super();
   this.name = name;
  }

  public synchronized void passTo(Player p) {
   System.out.println(this.name + " passes to " + p.name);
   try {
    // to imitate long task
    Thread.sleep(1000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   p.passBack(this);
  }

  public synchronized void passBack(Player p) {
   System.out.println(this.name + " passes back to " + p.name);
  }
 }

 public static void main(String[] args) {
  final Player ivan = new Player("Ivan");
  final Player petro = new Player("Petro");

  new Thread(new Runnable() {
   public void run() {
    ivan.passTo(petro);
   }
  }).start();
  new Thread(new Runnable() {
   public void run() {
    petro.passTo(ivan);
   }
  }).start();
 }
}
If your run this demo you'll see that both threads start and enter passTo method simultaneously. Both players hold their monitors in for loop. Now in first thread ivan holds its monitor and petro needs to acquire his monitor to call passBack method . Meanwhile in second thread petro holds its own monitor and ivan needs to acquire his monitor to call passBack method. So, both players hold their monitors and both wait for friend to acquire his monitor.
How can this deadlock be prevented?
One solution is to use timeout lock with help of java.util.concurrent.locks.ReentrantLock
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TimeoutLock {
 static class Player {
  private String name;
  private ReentrantLock lock = new ReentrantLock();

  public Player(String name) {
   super();
   this.name = name;
  }

  public void passTo(Player p) {
   try {
    lock.lock();
    System.out.println(this.name + " passes to " + p.name);
    try {
     // to imitate long task
     Thread.sleep(1000);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    p.passBack(this);
   } finally {
    lock.unlock();
   }
  }

  public void passBack(Player p) {
   try {
    if (lock.tryLock(1, TimeUnit.SECONDS))
     System.out.println(this.name + " passes back to " + p.name);
   } catch (InterruptedException e) {
    e.printStackTrace();
   } finally {
    if (lock.isHeldByCurrentThread())
     lock.unlock();
   }
  }
 }

 public static void main(String[] args) {
  final Player ivan = new Player("Ivan");
  final Player petro = new Player("Petro");

  new Thread(new Runnable() {
   public void run() {
    ivan.passTo(petro);
   }
  }).start();
  new Thread(new Runnable() {
   public void run() {
    petro.passTo(ivan);
   }
  }).start();
 }
}
Timeout lock is implemented by ReentrantLock's method :


public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException



This method tries to acquire lock for specified amount of time. If lock is acquired it returns true and false otherwise. 
There is also method that allows to acquire a lock and return immediately. True is returned if lock was acquired and false otherwise:
public boolean tryLock() { ... }
We could very well use this method at line 32 instead of timeout lock.

We know that our deadlock happened because of lock on passBack method. What we really changed is that thread is now waiting only for 1 second to acquire a lock in method passBack. If lock is not acquired for 1 sec thread goes directly to finally statement where it checks if lock was acquired by isHeldByCurrentThread(line 37) and if so it releases lock by unlock(line 38). Either lock was acquired or not, finally statement runs to release lock if it was acquired.
Thread which first started to wait 1 sec for lock in passBack method fails to acquire it and thus method passBack finishes making passTo to finish which release acquired lock in finally statement(line 26). This allows another thread to take released lock and execute println in passBack method. 
For me the output is :
1
2
3
Ivan passes to Petro
Petro passes to Ivan
Ivan passes back to Petro
As we see Petro doesn't pass back to Ivan.

Another solution to prevent deadlock is to use make synchronized passTo method as statis. This means that class monitor is used instead of object monitor. Therefore two Player insances cannot enter passTo simultaneously because it is locked by class monitor which is single for all instances.
To try synchronization by class monitor run this demo:
public class StaticMonitor {
 static class Player {
  private String name;

  public Player(String name) {
   super();
   this.name = name;
  }

  public synchronized static void pass(Player passer, Player receiver) {
   System.out.println(passer.name + " passes to " + receiver.name);
   try {
    // to imitate long task
    Thread.sleep(1000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   passBack(receiver, passer);
  }

  public synchronized static void passBack(Player passer, Player receiver) {
   System.out.println(passer.name + " passes back to " + receiver.name);
  }
 }

 public static void main(String[] args) {
  final Player ivan = new Player("Ivan");
  final Player petro = new Player("Petro");

  new Thread(new Runnable() {
   public void run() {
    Player.pass(ivan, petro);
   }
  }).start();
  new Thread(new Runnable() {
   public void run() {
    Player.pass(petro, ivan);
   }
  }).start();
 }
}
Method pass is locked by class monitor which is the same for two Player objects. The output is :
Ivan passes to Petro
Petro passes back to Ivan
Petro passes to Ivan
Ivan passes back to Petro

Starvation means that thread cannot acquire a lock to enter synchronized method because other threads with higher priority take this lock. Starvation occurs because some threads have much less priority than others. Hence we can say that main reasons for starvation are:
  • synchronized method is frequently executed by threads with higher priority. 
  • many threads are suspended by wait(). They wait for the same lock. Then notify() releases a lock to wake up single random thread with higher priority.
Starvation is easy to code if you explicitly set priorities to threads. Run this demo to see how fewer times thread with min_priority takes lock than threads with max_priority.
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class StarvationRunnable implements Runnable {

 public synchronized void doLongTask() {
  for (int i = 0; i < 1000000000; i++);
  System.out.println(Thread.currentThread().getName() + " is running with priority "
    + Thread.currentThread().getPriority() + " !");
 }

 @Override
 public void run() {
  for (;;) {
   doLongTask();
  }
 }

 public static void main(String[] args) {
  StarvationRunnable runnable = new StarvationRunnable();

  for (int i = 0; i < 4; i++) {
   Thread thread = new Thread(runnable);
   thread.setPriority(i == 3 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY);
   thread.start();
  }
 }
}
At line 21 priority is set to be max for all threads except forth whose priority is min. Max priority is 10 and min is 1. The output shows that thread with priority 1 takes a lock and executes doLongTask much fewer times.

To solve starvation java provides so-called fair lock. Fair lock is different from normal lock in that it gives lock not to random thread but to the longest waiting thread as pointed in docs of ReentrantLock :
The constructor for this class accepts an optional fairness parameter. When set true, under contention, locks favor granting access to the longest-waiting thread. Otherwise this lock does not guarantee any particular access order.
Fair lock has nothing to do with thread priorities :
Note however, that fairness of locks does not guarantee fairness of thread scheduling. Thus, one of many threads using a fair lock may obtain it multiple times in succession while other active threads are not progressing and not currently holding the lock.
The above starvation demo cannot be solved by fair lock because it's about thread scheduling by priorities, but not about access to lock.

Livelock means that two threads are blocking each other but doing this actively rather than lying dormant. In deadlock we had two threads that waited for lock from each other. In livelock we have two threads that don't wait for lock but retry to acquire it. Simple demo of livelock is:
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import java.util.concurrent.locks.ReentrantLock;

public class LiveLock {
 static class Player {
  private ReentrantLock lock = new ReentrantLock();
  private String name;

  public Player(String name) {
   super();
   this.name = name;
  }

  public boolean passTo(Player p) {
   if (lock.tryLock()) {
    System.out.println(this.name + " passes to " + p.name);
    try {
     // to imitate long task
     Thread.sleep(1000);
    } catch (InterruptedException e) {
     e.printStackTrace();
    } finally {
     // lock of this player is released.
     lock.unlock();
    }
    // passBack is locked on p.lock
    return p.passBack(this);
   }
   return false;
  }

  public boolean passBack(Player p) {
   // tryLock returns false immediately if lock cannot be acquired
   if (lock.tryLock()) {
    try {
     System.out.println(this.name + " passes back to " + p.name);
     return true;
    } finally {
     lock.unlock();
    }
   }
   return false;
  }
 }

 public static void main(String[] args) {
  final Player ivan = new Player("Ivan");
  final Player petro = new Player("Petro");

  new Thread(new Runnable() {
   public void run() {
    while (!ivan.passTo(petro)) {
    }
    System.out.println("first thread finished");
   }
  }).start();
  new Thread(new Runnable() {
   public void run() {
    while (!petro.passTo(ivan)) {
    }
    System.out.println("second thread finished");
   }
  }).start();
 }
}
Note that with deadlock we had two threads that waited for each other's lock on synchronized passBack method. Here thread doesn't wait for lock because method tryLock() returns immediately whether lock is acquired or not. If lock is not acquired then thread calls passTo again in while loop.

6. Guarded block

Guarded block allows to suspend and resume thread when certain condition is satisfied. Guarded block is implemented with help of wait() and notify() methods of Object class. Suppose, that we need to stop thread in method doIt until some other thread set allow variable to true. We can just code :
1
2
3
4
5
public synchronized void doIt() {
callFirstMethod();
 while (!allow);
 callSecondMethod();
}
At line 3 thread loops waiting for another thread. But this while loop is wasteful because thread holds lock and does nothing. More efficient way is to use wait() method as follows:
1
2
3
4
5
6
7
8
9
public synchronized void doIt() {
callFirstMethod();
 while (!allow) {
  try {
       wait();
  } catch (InterruptedException e) {}
 }
 callSecondMethod();
}
By calling wait() method thread suspends and releases monitor(lock). In order to call wait() and notify() thread must own a monitor of object whose wait() or notify() is called. To acquire monitor synchronized method or block should be used  on that object!!! Now some other thread should take this monitor to do its job and afterwards call notify() on the same object in order to wake up random single thread that waits in doIt.
Now other thread calls another method doSmthElse from the same class:
1
2
3
4
5
public synchronized void doSmthElse() {
 doJobBWhileDoItWaits();
 allow=true;
 notify();
}
At line 3 allow is set to true and at line 4 waiting thread in doIt is notified to resume.  notify() also releases a lock which is taken by waiting thread.
It is best practice to always call wait() in while loop with condition thread is waiting for !!! Don't assume that notify() has changed this condition.
To wake up 2 or more threads that called wait(), another thread should call notifyAll() instead of notify().
Now having basic understanding of thread interaction we can create simple demo to solve producer-consumer problem:
public class ProducerConsumer {

 static class Bin {
  private Object thing;
  private volatile boolean empty = true;

  public synchronized void put(Object newThing) {
   // guarded block is broken if bin is empty
   while (!empty) {
    // empty is false means that consumer hasn't consumed newThing yet
    try {
     // wait for consumer to take newThing
     wait();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
   this.thing = newThing;
   this.empty = false;
   // wake up consumer to take newThing
   notifyAll();
  }

  public synchronized Object take() {
   // while block is broken if bin is not empty
   while (empty) {
    // empty is true means that producer hasn't put newThing yet
    try {
     // wait for producer to put newThing
     wait();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
   this.empty = true;
   // wake up producer to put newThing
   notifyAll();
   return thing;
  }
 }

 public static void main(String[] args) {
  // instantiate single bin
  final Bin bin = new Bin();

  final Object[] data = { 12, 23.34f, "Wednesday", 1000000000, 10_000_000_000l };
  // create consumer thread
  new Thread(new Runnable() {
   @Override
   public void run() {
    for (int i = 0; i < data.length; i++) {
     Object newThing = bin.take();
     printData("take", newThing);
    }
   }
  }).start();
  // create producer thread
  new Thread(new Runnable() {
   @Override
   public void run() {
    for (int i = 0; i < data.length; i++) {
     Object newThing = data[i];
     bin.put(newThing);
     printData("put ", newThing);
    }
   }
  }).start();
 }

 private static void printData(String prefix, Object obj) {
  System.out.printf("%1$4s= %2$-11s \n", prefix, obj);
 }
}
As you can see in example, our guarded blocks protect both threads to perform their tasks(put() and take()) in wrong condition specified by empty flag. IF empty is true thread2 can put newThing and thread1 waits, if empty is false thread1 can take newThing and thread2 waits.
Note that empty flag is declared as volatile. This ensures that its value is always read from and written to main memory, but not thread's local memory. We can use volatile on variable whose read/write operations are atomic, like boolean or int but not long or double. No need to use volatile on variable that is updated by increment(++) or decrement(--), because such operations are not atomic.


Semaphore is thread synchronization object that is used to send signals between threads(as notify() does) or as a lock(exclusive access). It can also be used to limit number of threads that can acquire this lock.
By using semaphore you can avoid missed signals, meaning that notify() is not called before wait().
In java semaphore is implemented in java.util.concurrent.Semaphore class.
The following simple demo shows how semaphore can be used to limit number of threads that can execute block of code(opening and holding connection):
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo implements Runnable {

 static class GoogleConnector {
  private Semaphore semaphore = null;
  private URL url = null;

  public GoogleConnector(int connNumber) {
   try {
    url = new URL("http://www.google.com");
   } catch (MalformedURLException e) {
    e.printStackTrace();
   }
   // connNumber sets number of allowed connections
   semaphore = new Semaphore(connNumber);
  }

  public HttpURLConnection connect() {
   try {
    // if threads try to acquire more connections than connNumber they are be blocked here by semaphore
    // until some other thread closes connection
    semaphore.acquire();
    return (HttpURLConnection) url.openConnection();
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (IOException e) {
    e.printStackTrace();
   }
   return null;
  }

  public void close(HttpURLConnection connection) {
   connection.disconnect();
   // decrements number of opened connections by one.
   semaphore.release();
  }
 }

 private GoogleConnector connector = new GoogleConnector(3);

 @Override
 public void run() {
  HttpURLConnection connection = connector.connect();
  System.out.println(Thread.currentThread().getName() + " opened connection!");
  try {
   Thread.sleep(3000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  connector.close(connection);
 }

 public static void main(String[] args) {
  Runnable runnable = new SemaphoreDemo();

  new Thread(runnable).start();
  new Thread(runnable).start();
  new Thread(runnable).start();
  new Thread(runnable).start();
 }
}
Class GoogleConnector instantiates Semaphore with number of threads that can acquire and hold it. Number of threads is provided as argument of GoogleConnector constructor. Single instance of GoogleConnector is created with argument 3 at line 44. In main method we see that there are four threads that run the same Runnable.run() method. In run(), each thread goes to GoogleConnector.connect() method to acquire semaphore and open new URLConnection. Then each thread sleeps for 3 seconds and afterwards goes to GoogleConnector.close() method to release its semaphore and close URLConnection
Semaphore is created with three permits to enter block of code protected by semaphore. But there are four threads each trying to pass through semaphore.
If we run demo we see that from start only 3 threads has opened connection(line 28) and fell asleep(line 51). Forth thread is waiting on semaphore.acquire() method(line 27) until some other thread calls semaphore.release() to decrement number of threads that are in critical section(max is 3 threads). So, forth thread is waiting for 3 seconds before other thread calls semaphore.release(). Only after that forth thread can pass through semaphore.acquire() to open connection and fall asleep. You can see 3 seconds delay between println statements at line 49.

In our producer-consumer demo we could put and take only single item. What if we need to synchronize put and take for collection of items?
For this java comes with java.util.concurrent.BlockingQueue class.
To instantiate BlockingQueue you need to pass maximum number of elements it can hold. BlockingQueue allows one thread to put new item and another thread to take this item by way of first-input-first-output. One thread puts new item into queue and if it is already full then thread waits untill another thread dequeues(means remove and retrieve) one item. IF one thread  tries to dequeue an item from empty queue then it waits until another thread enqueue(means insert) new item into queue.
Blocking queue has methods that allow threads to wait for notEmpty or notFull conditions(take() and put(e)) and methods to wait for specified amount of time(poll(time,unit) and offer(e,time,unit)) and methods that throw exceptions if notEmpty or notFull conditions are false (add(e),remove(),element()) and methods that return immediately (offer(e),poll(),peek()). 
All methods of BlockingQueue are synchronized by ReentrantLock.
Let's modify our previous producer-consumer demo to be able to store many items:
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {

 static class Bin<E> {
  private BlockingQueue<E> items;

  public Bin(int maxQuantity) {
   items = new ArrayBlockingQueue<>(maxQuantity);
  }

  public void put(E newItem) throws InterruptedException {
   // this method internally uses guarded block to block thread if condition is count == maxQuantity
   items.put(newItem);
  }

  public E take() throws InterruptedException {
   // this method blocks thread if queue is empty.
   // It removes and retrieves first added element
   return items.take();
  }
 }

 public static void main(String[] args) throws InterruptedException {
  // instantiate single bin with maximum number of elements equal to 3
  final Bin<Object> bin = new Bin<>(3);
  // 0xffff - max value char can hold
  final Object[] data = { 12, 23.34f, "Wednesday", 1000_000_000, 10_000_000_000l, 0xffff };
  // create producer thread
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     for (int i = 0; i < data.length; i++) {
      Object newThing = data[i];
      bin.put(newThing);
      printData("put ", newThing);
     }
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }).start();
  // to let blocking queue get full and block producer thread from putting forth element
  Thread.sleep(3000);
  System.out.printf("%16s \n", "After sleep");
  // create consumer thread
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     for (int i = 0; i < data.length; i++) {
      Object newThing = bin.take();
      printData("take", newThing);
     }
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }).start();
 }

 private static void printData(String prefix, Object obj) {
  System.out.printf("%1$4s= %2$-11s \n", prefix, obj);
 }
}
Behind the scene, BlockingQueue uses the same wait() and notify() construct as we used in first demo. Run this demo and you'll see that after 3 items are put into queue, thread is blocked until consumer thread is created and dequeues element. To make producer block for 3 seconds there is line 46 to suspend main thread before creating consumer. The output shows this:
put = 12          
put = 23.34       
put = Wednesday   
     After sleep 
put = 1000000000  
take= 12          
put = 10000000000 
take= 23.34       
put = 65535       
take= Wednesday   
take= 1000000000  
take= 10000000000 
take= 65535       
There are sevral implementations of BlockingQueue interface:
  • ArrayBlockingQueue - bounded BlockingQueue backed by array. You must set maximum capacity.
  • LinkedBlockingQueue - unbounded BlockingQueue backed by linked list(nodes that reference one another sequentially). You may or may not set max capacity.
  • PriorityBlockingQueue - unbounded BlockingQueue that stores elements like java.util.PriorityQueue. PriorityQueue stores its elements according to natural ordering or provided Comparator. Elements must implement Comparable interface to be sorted.

7. Executors

Next we look at high-level concurrency API provided since java 5.
To create and start new thread from Runnable we can use executors.
There are three main interfaces for executors in java.util.concurrent package:
  • Executor - interface that supports launching new Runnables
  • ExecutorService - subinterface of Executor that supports launching and terminating Runnables and Callables.
  • ScheduledExecutorService - subinterface of ExecutorService that supports future launching and/or periodic execution.
Executor interface has single method that creates new Thread with provided Runnable and starts it. So this:
new Thread(runnable).start();
is equal to :
executor.execute(runnable);
However there are different implementations of Executor. Almost all supports thread pool,  so new Thread might not be created but existent is reused.
Let's now go through most used implementations of thread pool. To create specific thread pool there is Executors class that has static methods to create thread pool.
Fixed thread pool holds as many threads as specified by client. If there are more tasks submitted than threads, spare tasks are waiting to be executed. If some thread terminates, it is replaced with new thread. 
Simple demo:
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class FixedThreadPoolRunnable implements Runnable {
 @Override
 public void run() {
  try {
   Thread.sleep(2000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  System.out.println(Thread.currentThread().getName() + " is finishing!");
 }

 public static void main(String[] args) throws InterruptedException {
  ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
  FixedThreadPoolRunnable ftpr = new FixedThreadPoolRunnable();
  
  for (int i = 0; i < 6; i++) {
   fixedThreadPool.execute(ftpr);
  }
  // blocks main thread until submitted tasks are executed
  fixedThreadPool.awaitTermination(4100, TimeUnit.MILLISECONDS);
  // Destroys thread pool
  fixedThreadPool.shutdown();
  System.out.println("shutdown is called");
 }
}
In main method at line 17, fixed thread pool is created to always hold 3 threads. At line 18 single Runnable ftpr is instantiated. At line 20 for loop sends 6 same tasks to be executed by 3 threads. At first 3 threads execute first 3 tasks and afterwards same threads execute next 3 tasks. You may see in output that there is 2 seconds delay between first and second three printlns at line 13. This is because after 1,2,3 task has finished, next 4,5,6 task start and wait for 2 sec at line 9. While 6 tasks are running, we block main thread for 4.1 seconds(enough to finish tasks) at line 24. At line 25 we call shutdown() to destroy thread pool. If shutdown() was not invoked, 3 threads in thread pool would continue running forever.

Cached thread pool creates new threads as needed. If there are free threads they are reused. If there is no free thread, it is created and added to the pool. If thread is not used for 60 seconds, it is removed from the pool. 
Simple demo:
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CachedThreadPoolRunnable implements Runnable {

 @Override
 public void run() {
  try {
   Thread.sleep(2000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  System.out.println(Thread.currentThread().getName() + " is finishing!");
 }

 public static void main(String[] args) throws InterruptedException {
  ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  CachedThreadPoolRunnable ctpb = new CachedThreadPoolRunnable();

  // cached thread pool creates as many threads as needed to execute provided tasks
  for (int i = 0; i < 3; i++) {
   cachedThreadPool.execute(ctpb);
  }
  // block main thread until first 4 tasks finish
  cachedThreadPool.awaitTermination(2100, TimeUnit.MILLISECONDS);
  // if there are free threads they are reused
  for (int i = 0; i < 2; i++) {
   cachedThreadPool.execute(ctpb);
  }
 }
}
At line 22 in for loop, three same tasks are submitted simultaneously to cached thread pool. Thus 3 new threads are created. At line 26 we block main thread until submitted tasks have finished. At line 28 in for loop we submit same task twice just to show that previously created threads are reused. Note that we don't need to call shutdown() to destroy thread pool. Because if thread is not used for 60 seconds, it is removed from the pool. You can see in console that jvm terminates on 60 seconds after last printlns.

Scheduled thread pool has method to execute Runnable periodically with initial delay and delay between finish of previous and start of next executions. Simple demo:
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolRunnable implements Runnable {

 @Override
 public void run() {
  System.out.println(Thread.currentThread().getName() + " starts");
  try {
   Thread.sleep(2000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  System.out.println(Thread.currentThread().getName() + " finishes");
 }

 public static void main(String[] args) throws InterruptedException {
  ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
  ScheduledThreadPoolRunnable stpr = new ScheduledThreadPoolRunnable();

  // initial delay before execution is 1 sec. Delay between finish of previous and start of next executions is 2
  // sec
  scheduledThreadPool.scheduleWithFixedDelay(stpr, 1, 2, TimeUnit.SECONDS);
  scheduledThreadPool.awaitTermination(20, TimeUnit.SECONDS);
  scheduledThreadPool.shutdown();
  System.out.println("shutdown is called");
 }
}
At line 19, schedulaed thread pool is created to always hold 3 threads. At line 20 we instantiate our task stpr. At line 24, scheduleWithFixedDelay method is called where first argument is Runnable to be executed periodically, second argument is initial delay and third is period between finish and start of subsequent executions, fifth argument is time unit for initial delay and period.
At line 25 we block main thread to see task stpr running periodically. Finally at line 26, thread pool is destroyed to terminate jvm. If exception occurs, the task is no longer executed.

The fork/join framework is implementation of ExecutorService that allows to take advantage of multiple processors. It is designed to break task into pieces that can be executed in parallel. If one thread finished its job and there are pending task thread executes it. It's called work-stealing algorithm and is implemented by thread pool java.util.concurrent.ForkJoinPool class. This class accepts ForkJoinTask object which can break itself into smaller pieces and submit them for execution recursively. Simple demo clears up everything:
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

class ArraySorter extends RecursiveAction {

 private Object[] array = null;

 public ArraySorter(Object[] array) {
  super();
  this.array = array;
 }

 private void sortOneRowArray() {
  System.out.println(Thread.currentThread().getName() + " sorts " + Arrays.deepToString(array));
  Arrays.sort(array);
 }

 private int getDimensionCount() {
  int count = 0;
  Class arrayClass = array.getClass();
  while (arrayClass.isArray()) {
   count++;
   arrayClass = arrayClass.getComponentType();
  }
  return count;
 }

 @Override
 protected void compute() {
  if (array != null && array.length > 0)
   if (getDimensionCount() > 1) {
    // the array can be 2 or 3 or more dimensional
    ArraySorter[] rows = new ArraySorter[array.length];
    for (int i = 0; i < array.length; i++) {
     // take one row(array[i]) from 2d array and sort it in parallel
     rows[i] = new ArraySorter((Object[]) array[i]);
    }
    // this method calls compute of each ArraySorter in parallel. If one thread finished sooner and there
    // is new task(ArraySorter instance) available, it performs compute on it
    invokeAll(rows);
   } else {
    // the array is one-dimensional
    sortOneRowArray();
   }
 }
}

public class ForkJoinDemo {

 static private void fillTable(Object[][] table) {
  Random random = new Random();
  for (int i = 0; i < table.length; i++) {
   for (int j = 0; j < table[i].length; j++) {
    table[i][j] = random.nextInt(1000);
   }
  }
 }

 public static void main(String... args) throws InterruptedException {
  Object[][] table = new Object[10][5];
  // fill table with random ints
  fillTable(table);
  ArraySorter arraySorter = new ArraySorter(table);

  ForkJoinPool pool = new ForkJoinPool();
  // sort table's rows in parallel by fork/join framework
  pool.execute(arraySorter);
  pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
 }
}
Instance of ForkJoinTask is created at line 66 called arraySorter. At line 68 instance of ForkJoinPool is created. It maintains worker threads that perform work-stealing algorithm. ForkJoinTask is passed for execution at line 70. At line 71, main thread is blocked to let ForkJoinPool accomplish tasks.
ArraySorter extends RecursiveAction which extends ForkJoinTask class. RecursiveAction has one abstract method called compute(). This method is called by thread workers of ForkJoinPool. In compute() method at line 33 we prevent from sorting null or empty array. At line 34 we check if array is one dimensional or more dimensional. If it is one-dimensional, it is sorted directly(line 46) by method sortOneRowArray(). If array is multi-dimensional, every row is wrapped into ArraySorter at line 39. At line 43, ArraySorters are submitted to be executed in parallel so that each thread sorts one row! By calling invokeAll(rows) we tell ForkJoinPool to execute provided ForkJoinTasks by calling their compute() method in parallel. Next, recursively, provided rows array is checked for its dimension depth and if it has one dimension, it is sorted by sortOneRowArray(), otherwise - it is splitted by rows which are extracted and sent to be sorted asynchronously by compute().

No comments:

Post a Comment