Friday, July 20, 2007

Discovering Erlang

Distributed Systems

In my first post, “Toward a Universal Data Description Language,” I mentioned that I thought that “any computer system of interest is distributed.” My post today will describe a programming language I discovered only recently that is designed specifically for distributed systems. It is remarkable to me that it is not even a new language—having been around for well over a decade—since a language such as this, specifically designed to solve this common problem, should have been noticed by me before.

In distributed systems, nodes of the system communicate by sending messages between each other. There are two common patterns of the communication:

  • Pull. Information receiver “client” node pulls from an information source "server" node by sending a request to the server and receiving the information in its reply.
  • Push. Information source “publisher” node pushes information to a "subscriber" node.

If the information source and receiver use identical data formats and communication protocols, then implementation of the system is easy: information source and receiver can send messages directly between each other. However, it is quite common for different nodes to have been developed by different organization (e.g., the inventory system was developed by a company that specializes in MRP systems, the order entry system was created by an e-commerce company), so the different nodes don't speak the same language. This is where a real-time data linking application is required. A real-time data linking application (LINK) serves as a translator between the homogeneous systems, speaking each nodes native language and translating between them in real-time. These translations are not always as straightforward as they seem. In this scenario, the LINK acts as subscriber, client, and publisher in the same transaction.

  1. Node A publishes information X to LINK.
  2. LINK sends request for additional information Y to node B.
  3. Node B replies to LINK with information Y.
  4. LINK combines X & Y into Z then sends Z to node C.
  5. Node C sends acknowledgment to LINK.
  6. LINK sends acknowledgment to node A.

The Problem

In this scenario, the LINK withholds its acknowledgment to node A until the last step. This delay will block node A from sending more information until it is sure that all processing is complete with the original message X. However, it need not be so, and, in fact, this ordering of events could be disastrous to the system as a whole. Suppose node A publishes information at a rate of 10 messages per second, node B is well able to handle 10 requests per second, and node C can certainly receive 10 messages per second. It would seem that we should not have a problem.

And we should not, but we do: node B takes ten seconds to process each request. Because node B can handle 100 simultaneous connections, it can still keep up with the 10 requests per second pace required of it, but it requires the LINK to have multiple requests at any one time. If the LINK withholds its acknowledgment until step 6, then it will never have multiple requests to send to node B. Therefore, the LINK must send its acknowledgment around step 2. However, once it does this, it needs to be able to manage multiple instances of each piece of information and match up each Y with the correct X to form message Z. There may be no guarantee that node B will respond to requests in the order received (some information may be cached and return very quickly, while other requests might require querying some other remote data source), so a simple queue will not work either, where each Y is matched to the first X in the X-queue.

The Solution: Erlang

Data linking applications, therefore, usually require support for handling concurrent jobs. If we had the ability to spawn a new process for each message X received from node A, then each spawned process need only keep track of one X and one Y. The complexity of our program goes down significantly with this feature, since our program logic need only describe how to handle a single message X instead of having to describe how to manage all the message X’s, and Y’s, and the connections to node B, etc.

Erlang is a programming language specifically designed to handle concurrency, and my initial dabbles in it seem to indicate it is well-suited for this sort of problem. It is available for a variety of platforms, including Windows.

An Introduction to Erlang

I am admittedly no expert in Erlang, so my introduction will necessarily be “gentle”. My introduction, however, does assume that you managed to install Erlang and have tried some typical hello-world-type modules. I have little interest in repeating the excellent documentation that is available on-line. Your first stop should probably be

Suppose node B requires us to assign a unique nine-digit ID to each of our requests. How do we generate such an ID without repeating ourselves? Our first thought might be to utilize the built-in now() function, which returns the number of microseconds since 00:00 GMT, January 1, 1970. The documentation for this function guarantees that subsequent calls to the function will return continuously increasing values so that it can be used to generate unique time-stamps. This is precisely what we are looking for!

Unfortunately, we are restricted to nine digits, and now() will produce well over nine digits. If we just take the least significant nine digits, then we have a cycle that repeats every three hours, so there is no guarantee that the chosen numbers will be unique. We could play the odds, but our odds of a collision are significantly increased if our system (like mine) has only millisecond resolution. We might then decide to drop the last three digits and use only the least significant nine of the milliseconds in the epoch, but even this repeats three times a year.

The solution is a sequence: start at 000000000 and increment it by one for each message. At our average rate of 10 transactions per second, the sequence will last us 31 years. One might argue that this merely puts off the repetition and we run the risk of having a Y2K-like problem in 2038, but with only nine digits to play with, this is the best we can do even in theory.

So we decide to write a module sequence to encapsulate our ID-generating logic. If we were to write this module using a more traditional language, our resulting Erlang module would use a variable to store the last ID generated, and we would increment this variable by one on each call. Translating this approach into Erlang produces a module that might look something like this:


next() ->  Next_ID =
case get(sequence) of
undefined -> 0;
ID -> ID + 1
put(sequence, Next_ID),
string:right(integer_to_list(Next_ID), 9, $0).
Testing this module in Erlang seems to indicate that it works:
(node1@host1)1> sequence:next(). 
(node1@host1)2> sequence:next(). 
(node1@host1)3> sequence:next(). 

But that is not a very good test. When we utilize the sequence in LINK implementation, each call to sequence:next() will be in its own process with its very own process dictionary. This means that the get() and put() calls of our implementation our initialized every time. To demonstrate, we spawn off processes to make the calls to sequence:next():

(node1@host1)4> spawn(fun() -> io:format(sequence:next()) end). 
(node1@host1)5> spawn(fun() -> io:format(sequence:next()) end). 
(node1@host1)6> spawn(fun() -> io:format(sequence:next()) end). 

As expected, our function returned “000000000” every single time. (The numbers between angled brackets, such as “<0.63.0>”, are the return values of the spawn function, which is output by the Erlang interface.)

If we continue our traditional programming approach to this problem, we might come up with some work-arounds. For example, we might have the process that receives the message from node A generate the ID and pass it to the process it spawns to handle the message. This, however, leaks details of our implementation of the communication with node B into the code that handles node A communications. If node B changes its interface so as to require a different ID, or multiple ID's, or to no longer even require an ID, our code that handles communications with node A would have to change.

In Erlang, processes are cheap. They are cheaper even than operating system threads, so the Erlang solution is to create a process that generates the ID’s, and use that process to generate all our ID’s. This solution now looks something like this:

-export([start/0, next/0, next/1, loop/1]).

start() ->
register(sequence, spawn(sequence, loop, [0])).

loop(Next_ID) ->
{next, Requester_PID, Request_ID} ->
Requester_PID ! {Request_ID, string:right(integer_to_list(Next_ID), 9, $0)}
loop(Next_ID + 1).

get_next(Process) ->
Request_ID = make_ref(),
Process ! {next, self(), Request_ID},
{Request_ID, ID} -> ID

next() -> get_next(sequence).

next(Node) -> get_next({sequence, Node}).
Running the same test as before gives us the results we want:
(node1@host1)5> sequence:start().
(node1@host1)6> spawn(fun() -> io:format(sequence:next()) end).
(node1@host1)7> spawn(fun() -> io:format(sequence:next()) end).
(node1@host1)8> spawn(fun() -> io:format(sequence:next()) end).

You may wonder what was so special about this program; after all, it is just a counter. The wonderful thing about this, however, is that we have a sequence server that can be accessed from anywhere within the network. We can start Erlang on any machine on our network and get the next sequence with one line of code! In our example, we set up our sequence-generator on a node called “node1” on a host called “host1”. If we start Erlang on another host, say “host2”, naming our instance “node2” (note that you can have multiple nodes running on the same host), then the following one line will fetch for us on host2 the next number in the sequence from host1:

(node2@host2)1> sequence:next(node1@host1).

Erlang appears to be an good language in which to write concurrent distributed applications, and I shall investigate this language further.

No comments:

Post a Comment

There was an error in this gadget