Archive

Posts Tagged ‘Concurrency’

Series About Java Concurrency – Pt. 6

February 15, 2013 Leave a comment

This is the solution to my previous post, so make sure that you read it before you continue. For your convenience, here is the program we are going to discuss once again:

public class ThreadNoMore extends Thread
{
    private static final int N_THREADS = 16;
    private static final AtomicInteger threadsStarted = new AtomicInteger();
    
    private final long ctorThreadId = Thread.currentThread().getId();
    
    @Override
    public synchronized void start()
    {
        if(ctorThreadId != Thread.currentThread().getId())
            threadsStarted.incrementAndGet();
    }
    
    public static void main(String[] args) throws InterruptedException
    {
        Thread[] threads = new Thread[N_THREADS];
        for(int i = 0; i != N_THREADS; ++i)
            threads[i] = new ThreadNoMore();
        
        for(Thread th : threads)
            th.start();
        
        for(Thread th : threads)
            th.join();
        
        System.out.println("threadsStarted: " + threadsStarted);
    }
}

As already pointed out by Ortwin Glück, the program always prints

threadsStarted: 0

because ThreadNoMore.start() is always executed in thread that invoked it. And if you think that over twice, this should not surprise you at all, as methods are always executed in the thread that invoked them. This even applies to Thread.run(), which is the method we should have overridden instead, but unlike Thread.start(), Thread.run() is normally invoked by the Java Virtual Machine, as the Javadocs for Thread.start() tell us:

Causes this thread to begin execution; the Java Virtual Machine calls the run method of this thread.

So apart from not doing what we might have intended, carelessly overriding Thread.start(), that is without calling super.start(), so that the JVM can do its magic, leaves us with a crippled class, that no longer has anything to do with a thread at all. Considering these facts, it should not take you by surprise that

public class Thrap extends Thread
{
    private static final int N_THREADS = 42;
    
    private static final AtomicInteger 
        started = new AtomicInteger(),
        ran = new AtomicInteger();
    
    @Override
    public synchronized void start()
    {
        started.incrementAndGet();
    }
    
    @Override
    public void run()
    {
        ran.incrementAndGet();
    }
    
    public static void main(String[] args) throws InterruptedException
    {
        Thread[] threads = new Thread[N_THREADS];
        for(int i = 0; i != N_THREADS; ++i)
            threads[i] = new Thrap();
        
        for(Thread thread : threads)
            thread.start();
        
        for(Thread thread : threads)
            thread.join();
        
        System.out.println("started: " + started);
        System.out.println("ran: " + ran);
    }
}

results in

started: 42
ran: 0

being written to your terminal. As you can see clearly, Thrap.run() is not executed at all, neither from a newly created, nor from the main thread. The fix the code above, you have to call super.start() in Thrap.start() like so:

    @Override
    public synchronized void start()
    {
        super.start();
        startedThreads.incrementAndGet();
    }

After this modification you get

started: 42
ran: 42

as expected.

So what can we learn from all of this? At first, there are two things to remember about the Java threads API:

Equally important are the consequences for API design: Interfaces should be easy to use correctly and hard to use incorrectly. Unfortunately the Thread class violates this principle, as it is quite easy to misuse as we have just seen. More generally, requiring clients to call the super version of a method they are overriding is considered to be an anti pattern for this very reason.

Categories: Programming Tags: , ,

Series About Java Concurrency – Pt. 5

February 13, 2013 1 comment

After quite some time, here is a concurrency related puzzle once again: Take a look at the following program and try to predict its output:

package com.wordpress.mlangc.concurrent;

import java.util.concurrent.atomic.AtomicInteger;

public class ThreadNoMore extends Thread
{
    private static final int N_THREADS = 16;
    private static final AtomicInteger threadsStarted = new AtomicInteger();
    
    private final long ctorThreadId = Thread.currentThread().getId();
    
    @Override
    public synchronized void start()
    {
        if(ctorThreadId != Thread.currentThread().getId())
            threadsStarted.incrementAndGet();
    }
    
    public static void main(String[] args) throws InterruptedException
    {
        Thread[] threads = new Thread[N_THREADS];
        for(int i = 0; i != N_THREADS; ++i)
            threads[i] = new ThreadNoMore();
        
        for(Thread th : threads)
            th.start();
        
        for(Thread th : threads)
            th.join();
        
        System.out.println("threadsStarted: " + threadsStarted);
    }
}

The solution, together with a detailed explanation will be available soon.

Categories: Programming Tags: , ,

Series About Java Concurrency – Pt. 4

May 19, 2011 6 comments

This post is the solution of Series About Java Concurrency – Pt. 3, so be sure to read my last post before this one.

So, what does the code from Series About Java Concurrency – Pt. 3 do? As it turns out, there isn’t a single answer: If you are using a recent Oracle VM the behavior of the program seems to depend on whether your VM runs in server or client mode. In client mode the program will most likely stop after roughly 2.5s as one would naively expect. In server mode however, the program just loops forever. The update to stop in line 24 never gets visible to the main thread. This isn’t a bug, but perfectly legal behavior according to the Java Memory Model, which allows optimizing

while(!stop)

into

while(true)

Synchronization is not just about avoiding data races; it is also needed to avoid reading stale data. This may sound weired, but allows the VM as well as the CPU to apply powerful optimizations (for example Out-of-order execution) that would otherwise be impossible or very hard to implement.

One way to fix our shiny little program is by using the synchronized keyword as demonstrated below:

package com.wordpress.mlangc.concurrent;

public class PleaseWait
{
    private static final long MILLIS_TO_WAIT = 2500;
    private static boolean stop = false;
    
    private static synchronized boolean isStop()
    {
        return stop;
    }
    
    private static synchronized void setStop(boolean stop)
    {
        PleaseWait.stop = stop;
    }
    
    public static void main(String[] args)
    {
        Thread timer = new Thread(new Runnable()
        {
            public void run()
            {
                try
                {
                    Thread.sleep(MILLIS_TO_WAIT);
                }
                catch(InterruptedException e)
                {
                    // We are already exiting the thread.
                }
                finally
                {
                    setStop(true);
                    System.out.println("Stop requested.");
                }
            }
        });
        
        timer.start();
        
        long start = System.currentTimeMillis();
        while(!isStop())
            ; // <-- Do nothing.
        
        long stoppedAfter = System.currentTimeMillis() - start;
        System.out.printf("Stopped after %dms.\n", stoppedAfter);
    }
}

It is important to note that both the setter and the getter are synchronized using the same lock. Synchronizing only the setter method is not enough!

While the code above works reasonably well, we can still do better because we don’t need mutual exclusion, but just want to ensure that main thread sees what the timer thread does. This can be accomplished quite easily by declaring stop to be volatile. The volatile keyword makes sure that the main thread always sees to most recent value of stop without any additional uses of synchronized. Following this advice, we end up with something like this:

package com.wordpress.mlangc.concurrent;

public class PleaseWait
{
    private static final long MILLIS_TO_WAIT = 2500;
    private static volatile boolean stop = false;
    
    public static void main(String[] args)
    {
        Thread timer = new Thread(new Runnable()
        {
            public void run()
            {
                try
                {
                    Thread.sleep(MILLIS_TO_WAIT);
                }
                catch(InterruptedException e)
                {
                    // We are already exiting the thread.
                }
                finally
                {
                    stop = true;
                    System.out.println("Stop requested.");
                }
            }
        });
        
        timer.start();
        
        long start = System.currentTimeMillis();
        while(!stop)
            ; // <-- Do nothing.
        
        long stoppedAfter = System.currentTimeMillis() - start;
        System.out.printf("Stopped after %dms.\n", stoppedAfter);
    }
}

The only difference to the broken version from Series About Java Concurrency – Pt. 3 is the volatile keyword in line 6.

Last but not least I want to state clearly that this article is not about the proper way to implement task cancellation, which is a nontrivial topic of it’s own. If parts of this post are new to you, I strongly suggest that you grab yourself a copy of the excellent book Java Concurrency in Practice and read at least the first chapter called Fundamentals thoroughly. By doing so you are almost certainly saving yourself from nasty surprises or frustrating debugging session in the future.

Categories: Programming Tags: , ,

Series About Java Concurrency – Pt. 3

May 10, 2011 8 comments

As it seems that even experienced Java programmers might have serious misconceptions regarding the Java memory model, I’ve decided to publish a concurrency related puzzle once again. Consider the following simple program and try to predict it’s output:

package com.wordpress.mlangc.concurrent;

public class PleaseWait
{
    private static final long MILLIS_TO_WAIT = 2500;
    private static boolean stop = false;
    
    public static void main(String[] args)
    {
        Thread timer = new Thread(new Runnable()
        {
            public void run()
            {
                try
                {
                    Thread.sleep(MILLIS_TO_WAIT);
                }
                catch(InterruptedException e)
                {
                    // We are already exiting the thread.
                }
                finally
                {
                    stop = true;
                    System.out.println("Stop requested.");
                }
            }
        });
        
        timer.start();
        
        long start = System.currentTimeMillis();
        while(!stop)
            ; // <-- Do nothing.
        
        long stoppedAfter = System.currentTimeMillis() - start;
        System.out.printf("Stopped after %dms.\n", stoppedAfter);
    }
}

The solution, together with explanations will be published in the near future.

Categories: Programming Tags: , ,

Propagating exceptions between different threads in C++0x

March 29, 2010 Leave a comment

Yesterday I finally decided to make use of all available C++0x features that are supported by gcc-4.4 in Untropy (a very specialized tool for a limited audience that is not of interest here). Whats interesting about this, is that what finally made the important difference for me is not one of the prominent features like Rvalue References, Auto Typed Variables or Initializer Lists. Instead it’s some seemingly unspectacular type called std::exception_ptr and a few related functions. This rather minimal, but very useful API has been designed to allow transporting exceptions between threads. Here is how this can be done with boost::thread (includes and namespaces are omitted for clarity – you can grab the full source here):

   /**
     * A callable implementation that allows exception propagation between 
     * threads for boost::tread.
     */
    class Callable
    {
        public:

        /**
         * Default copy constructor.
         *
         * @attention
         *  Should not be invoked from different threads with the same objects.
         */
        Callable(const Callable&) = default;

        /**
         * Default assignment operator.
         *
         * @attention
         *  Should not be invoked from diffenrent threads with the same objects.
         */
        Callable& operator=(const Callable&) = default;

        /**
         * To be called by boost::thread.
         */
        void operator()() const;

        /**
         * Rethrows pending exceptions.
         */
        void rethrowIfPending() const;

        /**
         * Returns true iff an exception is pending.
         */
        bool isExceptionPending() const;

        virtual ~Callable() = default;

        protected:

        Callable();

        /**
         * Does the actual work.
         *
         * @attention
         *  Exceptions thrown from this method will be catched in operator()(),
         *  and can be rethrown by rethrowIfPending() const.
         */
        virtual void doWork() const = 0;

        private:
        
        struct ExceptionContainer
        {
            std::exception_ptr exceptionPtr;
        };

        std::tr1::shared_ptr<boost::recursive_mutex> m_exceptionMutex;
        std::tr1::shared_ptr<ExceptionContainer> m_exception;
    };

Note that the only reason we explicitly declared a copy constructor and an assignment operator is for the comments above them. Luckily the C++0x feature Defaulted and Deleted Functions allows us to omit some boilerplate code here. But lets move to the more interesting, and less documented private part of the declaration above: Because boost::thread invokes a copy of our Callable, we cannot store pending exceptions in the class directly if we want them to be visible from outside. To solve this problem, we actually store them in a tr1::shared_ptr to a helper struct. Last but not least, we use a boost::recursive_mutex to guard the stored exception. Here is the implementation (the full source is here):

    void Callable::operator()() const
    {
       try
       {
           doWork();
       }
       catch(...)
       {
           boost::recursive_mutex::scoped_lock lock(*m_exceptionMutex);
           m_exception->exceptionPtr = current_exception();
       }
    }

    void Callable::rethrowIfPending() const
    {
        boost::recursive_mutex::scoped_lock lock(*m_exceptionMutex);
        if(isExceptionPending())
            rethrow_exception(m_exception->exceptionPtr);
    }

    bool Callable::isExceptionPending() const
    {
        boost::recursive_mutex::scoped_lock lock(*m_exceptionMutex);
        return (exception_ptr() != m_exception->exceptionPtr);
    }

    Callable::Callable()
        : m_exceptionMutex(new boost::recursive_mutex()),
          m_exception(new ExceptionContainer)
    {

    }

Using the C++0x features std::current_exception() in line 10 and std::rethrow_exception in line 18, thats all that needs to be done. The careful reader might notice, that Callable isn’t entirely thread save itself: Coping or assigning Callable instances that share internal state from within multiple threads will most likely result in undefined behavior, because tr1::shared_ptr is not thread save. In theory, C++0x contains APIs for accessing shared_ptr instances atomically from within multiple threads, but they are not available with gcc-4.4. Of course, I could have implemented a thread save reference counter in the Callable class directly, but that doesn’t seem worth the effort, as manipulating identical Callable instances from multiple threads is an error anyway. Last but not least: Here is a unit test for the code above, that also illustrates how it is meant to be used.

Categories: Programming Tags: , ,

Series about Java Concurrency – Pt. 2

December 6, 2009 2 comments

This post is a follow up to Series about Java Concurrency – Pt. 1, so be sure to read it before this one.

But now let us talk about the puzzle, shall we? Just by reading the source code it seems very reasonable that the program should just print out “[0, 1, 2, 3]”, as EntityManager is obviously thread save, and is lazily initialized in a synchronized block. However, if you run this program multiple times, you will most likely see it printing something different, like “[0]”. In fact, all one can say for sure about this program, is that it prints out a map that contains at least one element, and at most four elements from the set {0, 1, 2, 3}. Everything else depends on the mood of the JVM, the operation system, and the underlying hardware. So what’s wrong? The problem is, that if multiple threads access a shared resource, this resource has to be safely published unless it is immutable. One way to do so, is through a synchronized block where both writes and reads are guarded by the same object. The program above fails to do so here

            synchronized(this)

because this is a different object in every thread. Indeed, replacing the code above with

            synchronized(AllTheSameButDifferent.class)

makes our shiny little program print “[0, 1, 2, 3]” reliably. Still, locking on AllTheSameButDifferent.class might not be the best idea, as another, completely unrelated thread may acquire it too. If this thread happens to perform some lengthy computations while holding the lock, our program would be slowed down needlessly. So, locking on entityManager seems to be a better approach, but it’s still not ideal, as future versions of EntityManager (imagine for the moment that we are dealing with a library class) might use the same lock internally, which brings us down to basically the same reasoning as before. Thus I would rather recommend using

            synchronized(Action.class)

or even

            synchronized(entityMangerLock)

where entityMangerLock is defined in Action like this

private static final Object entityMangerLock = new Object();

Still, while all this reasoning about choosing the appropriate object for locking might very well be the right thing to do in another example, our problem has a solution that is much more elegant, as it makes the JVM to do everything for us. It relies on the fact, that classes are not initialized before they are used, and that class initialization is performed in a thread save manner automagically. You can find more about this in EffJava2 Item 71 as well as in JaCoP1. Here is how it looks like:

package at.lnet.puzzle;

import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;

public class AlwaysTheSameAndLazilyInitialized
{
    private static class EntityManager
    {
        private final Set<Integer> data = Collections.synchronizedSet(new TreeSet<Integer>());
        
        public void persist(final int value)
        {
            data.add(value);
        }
        
        public Set<Integer> getData()
        {
            return Collections.unmodifiableSet(data);
        }
    }
    
    private static class Holder
    {
        private static final EntityManager entityManager = new EntityManager();
    }
    
    private static EntityManager getEntityManager()
    {
        return Holder.entityManager;
    }
    
    private static class Action implements Runnable
    {
        private final int data;
        
        public Action(final int data)
        {
            this.data = data;
        }
        
        @Override
        public void run()
        {
            getEntityManager().persist(data);
        }
    }
    
    public static void main(final String[] args) throws InterruptedException
    {
        final int NTHREADS = 4;
        Thread[] threads = new Thread[NTHREADS];
        
        for(int i = 0; i != NTHREADS; ++i)
            threads[i] = new Thread(new Action(i));
        for(int i = 0; i != NTHREADS; ++i)
            threads[i].start();
        for(int i = 0; i != NTHREADS; ++i)
            threads[i].join();
        
        System.out.println(getEntityManager().getData());
    }
}

Note that this version is only 3 lines longer than the original puzzle, which is exactly the length of the convenience method

private static EntityManager getEntityManager()
{
    return Holder.entityManager;
}

that we could have omitted in theory. If you don’t believe that entityManager is in fact still lazily initialized, step through the program with a debugger.

Categories: Programming Tags: , ,

Series about Java Concurrency – Pt. 1

November 27, 2009 4 comments

As this is one of the things I’m currently occupied with, I’ve decided to start my blog with a series about Java-Concurrency. It’s going to consist of a series of thread related puzzles and their solutions. This approach is heavily inspired by the very interesting book Java Puzzlers which I strongly recommend to everyone who is doing serious Java programming. The solutions, together with detailed explanations, will usually appear a few days after the puzzles. You are strongly encouraged to analyze the source code carefully and predict its behavior without using a compiler. Only then you should run the program to see if your assumptions where true. Also note that because we are talking about threads, the results you get may vary depending on your hardware, operating system, JVM and system load.

So, what does the following program print (execute it at least 10 times)? Can you explain what is happening? If you think that you know what is going on, feel free to post it!

package at.lnet.puzzle;

import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;

public class AllTheSameButDifferent
{
    private static EntityManager entityManager;
    
    private static class EntityManager
    {
        private final Set<Integer> data = Collections.synchronizedSet(new TreeSet<Integer>());
        
        public void persist(final int value)
        {
            data.add(value);
        }
        
        public Set<Integer> getData()
        {
            return Collections.unmodifiableSet(data);
        }
    }
    
    private static class Action implements Runnable
    {
        private final int data;
        
        public Action(final int data)
        {
            this.data = data;
        }
        
        @Override
        public void run()
        {
            synchronized(this)
            {
                if(entityManager == null)
                    entityManager = new EntityManager();
            }
            entityManager.persist(data);
        }
    }
    
    public static void main(final String[] args) throws InterruptedException
    {
        final int NTHREADS = 4;
        Thread[] threads = new Thread[NTHREADS];
        
        for(int i = 0; i != NTHREADS; ++i)
            threads[i] = new Thread(new Action(i));
        for(int i = 0; i != NTHREADS; ++i)
            threads[i].start();
        for(int i = 0; i != NTHREADS; ++i)
            threads[i].join();
        
        System.out.println(entityManager.getData());
    }
}
Categories: Programming Tags: , ,
Follow

Get every new post delivered to your Inbox.