Thursday, May 2, 2013

Implement Lock using java primitives

Decided to check how fast I could write a working lock using Java primitives (popular interview question). Uncovered a few interesting facts:
1. I don't even remember how wait()/notify() works any more! I am so used to java.util.concurrent goodness... 30 minutes to understand the concept.
2. I needed to think about simplest design for about 10 minutes.
3. Code writing: 30 minutes. The result:

package bin.test;

import java.util.concurrent.ConcurrentLinkedQueue;

public class Lock
{
    public boolean shutdown = false;
    public boolean started = false;
    public static class Waiter
    {
        public Thread what;
        public Object itsLock;
        public Waiter(Thread what, Object myLock)
        {
            this.what = what;
            this.itsLock = myLock;
        }
    }
    public ConcurrentLinkedQueue<Waiter> waiters = new ConcurrentLinkedQueue<Waiter>();
    public Thread owner;
   
    public Lock()
    {
        Thread t = new Thread(new Dealer(), "Lock processor");
        t.setDaemon(true);
        t.start();
    }
   
    public class Dealer implements Runnable
    {

        @Override
        public void run()
        {
            started = true;
            while(!shutdown)
            {
                if(waiters.isEmpty())
                {
                    try {
                        System.out.println("1");
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                else
                {
                    if(owner != null)
                    {
                        try {
                            System.out.println("2");
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                       
                    }
                    else
                    {
                        System.out.println("3");
                        Waiter current = waiters.poll();
                        owner = current.what;
                        synchronized (current.itsLock) {
                            current.itsLock.notifyAll();
                        }
                    }
                }
                   
            }
           
        }
       
    }
    public void lock()
    {
        Object newLock = new Object();
        waiters.add(new Waiter(Thread.currentThread(), newLock));
        synchronized (newLock) {
            try {
                System.out.println(4+" for "+Thread.currentThread().getName());
                newLock.wait();
            } catch (InterruptedException e) {
                System.out.println(5);
                e.printStackTrace();
            }
        }
    }
   
    public void unlock()
    {
        System.out.println("7");
        owner = null;
    }
}

package bin.test;

public class LockTest {

    public static class LockHolder implements Runnable
    {

        public Lock l;
        public LockHolder(Lock l)
        {
            this.l = l;
        }
        @Override
        public void run()
        {
            System.out.println("h1 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());
            l.lock();
            System.out.println("h2 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                System.out.println("h3 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());
                e.printStackTrace();
            }
            System.out.println("h4 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());

            l.unlock();
            System.out.println("h5 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());

        }
       
    }
    /**
     * @param args
     */
    public static void main(String[] args)
    {
        Lock lock = new Lock();       
        while(!lock.started)
        {
            try {
                System.out.println("h10 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("h11 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());

        Thread t1 = new Thread(new LockHolder(lock), "holder 1");
        Thread t2 = new Thread(new LockHolder(lock), "holder 2");
        t1.setDaemon(true);
        t2.setDaemon(true);
        t1.start();
        t2.start();
        System.out.println("h6 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock.shutdown = true;
        System.out.println("h7 at:"+System.currentTimeMillis()+" for "+Thread.currentThread().getName());

    }

}

----
1
h10 at:1367557213231 for main
h11 at:1367557213731 for main
1
h6 at:1367557213731 for main
h1 at:1367557213731 for holder 1
h1 at:1367557213731 for holder 2
4 for holder 1
4 for holder 2
3
h2 at:1367557214231 for holder 1
2
2
2
2
2
h4 at:1367557216233 for holder 1
7
h5 at:1367557216233 for holder 1
3
1
h2 at:1367557216734 for holder 2
1
1
1
1
h4 at:1367557218736 for holder 2
7
h5 at:1367557218736 for holder 2
1
1
1
1
1
1
1
1
1
h7 at:1367557223731 for main

Saturday, September 3, 2011

Data that travels, again.

It's really quite surprising how many kinds of system you can build with traveling data and what range of tasks can be accomplished there.To me it is particularly surprising because the paradigm rarely comes to mind in a natural way. After all the most straightforward way of system building is to store the data somewhere in your system and then run multiple activities upon it. Unfortunately this approach usually creates close coupling between data and activities and even between activities or at least their execution sequence.
When a data moves activities can be totally decoupled from each other and even from the original piece of data that started the whole processing sequence. Imagine an event processing where event1 triggers activity1 and activity2. Then activities generate events of next generation with data enriched or transformed and these events trigger their own activities or just get ignored. Those system are very flexible - it's easy to add a new branch or a processor. If you use locality information to deploy event processor involved in the same chain close to each other the processing can be really fast. Obviously events should be relatively small, but if you combine event processing with a data storage solution you can have access to a much larger data object.
Events usually include several key pieces of meta- data to facilitate routing and monitoring: event type, creation time, TTL, creator etc.
On the other side of data traveling is map-reduce, which requires copying of rather large volumes of data. I find that one of most actively "growing" branches on map-reduce tree is HBASE highly suggestive of that paradigm viability. Basically it seems that moving large volumes of data is fine if there is no way you can avoid it. Otherwise pretty much anything else might be a better solution.

Thursday, September 1, 2011

Dead? Very Dead?

This is one of the most complex areas in distributed software. How do you define "dead" and what do you do when a node begins to behave strangely? How do you detect brown-outs and network partitioning? What do you do when these thing hit? The list of questions can go on and on.
Traditionally the simplest solutions work the best. Set a timeout on communications between nodes writing the same key segment, when a timeout expires the node that has access to a quorum device lives on while the other one commits suicide (or goes into read-only mode). Of course, you need a good quorum device - usually a lock service based on Paxos is a good provider of that sort of things. Moving to read-only mode (and coming back from the dead remembering previous state of data on the node) sounds like a better option.  Unfortunately people often forget that the next step is the update from an active node. Update puts extra load on the active node and can (and in many implementations does) push the active node over if it was loaded well enough.
A node that never comes back from the dead (e.g. gets a drive formatted while returning to the service) is simpler, but you might lose data that way AND you still need to solve the problem of adding a node to the affected key segment.I personally favor a resurrection attempt because network partitioning happens much more often than a true death.

Saturday, August 27, 2011

Routing and some more consistency

The easiest way to achieve simultaneous writing to X nodes is to copy the input stream as it comes, not waiting for it to complete. So a writer connects to one of the write-enabled nodes for a given key, starts writing and he node copies the stream to other write-enabled nodes. Follows that each write-enabled node should have a way to pick up X-1 other nodes for writing. The component picking up nodes must have at least minimal intelligence not to choose a node in the same rack or, better yet, to follow user-defined availability zone rules.
A storage node also needs to have persistent connections already established to the rest of the nodes used for writing of the key space on this node. That is, if write latency is of any importance. And of course the protocol must provide for a way to establish consistency in case of any errors. For instance, what happens if the original write node dies after sending the data object to some but not all of other data nodes?
There are also questions about the structure of data nodes key space and key space distribution: do nodes come in sets of X, where each node from the set is identical (key-space-wise) to the rest of the sets nodes or is a key space on a node made of smaller units and a copy of that smaller unit can be distributed independently of the rest of units on the node? DHT (distributed hast table) and DHT key-ring literature has examples of both approaches. I personally prefer a full back-up  (identical copies) because they are easy to create and manage.
Another question is the state of node coming back from the dead and the definition of "dead". See that in the next post.
On a side: it is difficult to find time to write while vacationing on Hawaii!

Wednesday, August 24, 2011

Consistency and user experience

Consistency comes into the picture with reliability and write speed requirements. What's the probability of eventually loosing the data you can live with? How fast should a write request return? If the answers are "0" and 10 ms you got into eventual consistency world already. Basic idea of eventual consistency is that when you write a data item, you write synchronously on X (minimum 1) machine only. The write returns with success if that operation totally succeeds and a failure otherwise. An asynchronous process copies the data to other Y storage machines (preferably located in another place). X/Y ratio's dependent on the degree of paranoia while the absolute values of X and Y (especially Y) depends upon you read requirements (e.g. such use-cases as edge caching; write in one region, read anywhere etc.).
Adding more X nodes slows writes down and makes them less reliable, which is why X is usually set at 2. Turns out people really do not understand why the data written by their own application does not show up on the next read request. Even worse problem is writing the second write on top of an obsolete version of data. There are various ways of dealing with this problem. One of them is to expose data version information to the application and let it make some sort of intelligent decision but that does not help the application developers all that much and the paradigm confuses them.
The only solution that seems to work is to provide consistent reads for the process that wrote the data and let everyone else read whatever version is available (they do not know what the current version is one way or another). This is known as read after write consistency. Most modern services achieve this goal by having some session stickiness in request routing level. Subsequent requests from a writer hit one of the X nodes used in the write request.


Monday, August 22, 2011

Data that travels

A separate kind, or for the lack of a better word, family of distributed systems includes data that travels from a node to node instead of being stored in one place. The range here covers anything from event processing systems (including workflow systems) to map-reduce clusters. Current state of data travels from step to step, being enriched or otherwise transformed by every processing step it goes through. It's really difficult to run queries or even know precisely how much data you have in your system at any given time. It's also difficult to predict how much time a given piece of data spends inside. Usually you collect statistics on the system behavior and that allows you to make fairly good guesses. There also some special tricks that help you run queries and get better estimates. People use these systems when there is more interest in processing results than data storage.
Consistency
Any kind of data storage and processing system can be eventually or immediately consistent. I use "Immediately consistent" to describe read-after-write consistency by any thread (the thread that wrote the data and any other). "Eventually consistent" means that the thread, which wrote the data can get a previous version of the data on the subsequent read. There is a really useful behavior where only the thread (client etc.) which wrote the data is guaranteed to read a new version immediately afterward, while any other thread (client) can read old or new version - however its luck works out.


Wednesday, August 17, 2011

Partitoned Data, Importance of Numbers

With this partitioning schema you must avoid cross-machine writes and reads. Under any sort of load cross-machine writes will fails in a variety of really exciting and difficult to untangle ways. Multi-phase commit is not going to help you there: it doesn't work on any real world load. The transactions just take too long, which gets reflected by humongous resource consumption, wildly different latencies and frequent failures. Network partitioning decreases your availability and adds failures, too. Multi-machine reads are also rather flaky for the same reasons. You also need to think about the combining the results from several nodes, pagination and more. In short - just do not allow cross-machine access if you are doing horizontal data partitioning.
If you do need to have cross-machine updates or queries, you need to implement a workflow. Any financial system is a shining example of such interaction pattern. Clearly, a response  in such system is asynchronous.
The importance of Numbers
I had meetings with my mentees and  the common issue with them was the lack of any numbers they could put on their requirements. Working on any system, distributed or not, you need to start by having a grasp on some cornerstone numbers:
  1. Expected Latency. Clearly, system with synchronous 15 ms response time must have very different architecture from a system with 2 second response time. You need to know both tp50 and tp99.9.
  2. Expected availability. If it's different for reads and writes or per resource, you need to know all the numbers. Also what are the criteria for availability (e.g. if ping works, but nothing else does - is the service available?)
  3. Expected redundancy.
  4. Total amount of money your business owners are ready to spend on the hardware. This is one of the things that greatly inspires your creativity, especially when you realize that the number is several times less than what you think it ought to be.