Thursday, November 13, 2008

JMagLev: Terracotta based MagLev for Ruby

In one of my previous posts I wrote about clustering solutions for Ruby. One of them was MagLev, based on Gemstone's Smalltalk virtual machine, the second one was based on Terracotta. The latter solution seems more appealing, since Terracotta is open source, and lets you do some own experiments (like this, or this, or even this). Unfortunately, all solutions I have seen so far (including my own) for JVM based languages other than Java depended on a special library that interfaced Terracotta through com.tc.object.bytecode.ManagerUtil class instances, and were not transparent to the programmer. Until now.
Recently, Fabio Kung took a step further and patched JRuby to behave like MagLev, transparently sharing objects across multiple JRuby runtimes. It seems that MagLev got a strong competitor before it even managed to hit the market. Have a look at the demo, it looks equally impressive to what Avi Bryant from Gemstone showed at RailsConf 2008.

Thursday, November 6, 2008

Distributed processing with JScheme and Terracotta

In my post about clustering JScheme with Terracotta I presented how to share Java objects through Terracotta using JScheme. But clustering is not only about sharing data. It is also about parallel (and possibly distributed) task processing.

One of the fundamental assumptions of Scheme (and other Lisps) is that code is data. Programs are just collections of evaluable expressions and thus you can represent data as code, and vice versa. This is exactly what you need when you want to distribute code over a system designed primarily to share data.

I prepared a simple system based on JScheme that allows distributed, concurrent code processing over a Terracotta cluster. It uses a concept of independent nodes (borrowed from Erlang). Each node polls a shared list to find a new expression to evaluate. Once it finds a job to be done, it evaluates the expression using a JScheme evaluator (instance of jscheme.JScheme class) and writes a result back on the list. The client process which initiated the task, reads back the result and returns it.
Since all nodes are independent entities, you can start as many of them as you need and use them concurrently. But in most cases the optimal number of nodes is equivalent to the number of CPUs (or CPU cores) to be used on each machine connected to Terracotta. So if you have 2 computers with a quad core CPU and want to use only half of their power, you can start 2 nodes on each of them. If you want to use them to the full, you should use 8 nodes, 4 per each machine, and so on. You can start the nodes on single machine using a single or multiple JScheme shells, it's up to you. For me, a single Scheme REPL seems to be the most convenient option.
Client jobs are started through tc-map. It's a function that is similar to the standard map, but it takes an additional argument - a list of nodes to use for the job. Unfortunately, the system is not fault tolerant, so if one of the nodes dies during doing the job, the whole processing task hangs up. The only way out then is either to restart the dead node or evaluate the whole tc-map expression again.

OK, enough for the theory, let's do some real work. First you need to download the library. The online folder contains the library itself (jstc.scm), sample Terracotta configuration (tc-config.xml) and some tests. After you get the library and the configuration file, you should start the Terracotta server (I described the whole procedure in detail previously). If you run the Terracotta server on a remote machine, you should also edit the tc-config.xml file and change server host to the IP of the Terracotta host. Now you can start JScheme through the Terracotta bootstrap:
java -Xbootclasspath/p:[terracotta boot jar] -Dtc.config=tc-config.xml -Dtc.install-root=[terracotta install dir] -jar jscheme.jar
You can to find the boot jar in lib/dso-boot folder of your Terracotta installation directory. If it isn't there, you can generate it with Terracotta make-boot-jar.sh script.
Now you can load the library with:
(load "jstc.scm")
and start playing with it. For starters, let's run two nodes:
(start-node "test1")
(start-node "test2")
and define a helper function to generate a list of integers from a specified range:
(define (range min max) (let loop ((x max) (l '())) (if (< x min) l (loop (- x 1) (cons x l)))))
Now we need to "teach" running evaluators the Fibonacci function:
(tc-load (list "test1" "test2") "(define (fib n) (cond ((= n 0) 0) ((= n 1) 1) (else (+ (fib (- n 1)) (fib (- n 2))))))" )
and we are ready to spread a test job across the nodes:
(tc-map (list "test1" "test2") "fib" (range 1 20))
After a few seconds you should receive the following list:
(1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181 6765)
You can use time function to compare computation times with results received by using a single node or a regular, sequential map:
(time (tc-map (list "test1" "test2") "fib" (range 1 20)))
(time (tc-map (list "test1") "fib" (range 1 20)))
(define (fib n) (cond ((= n 0) 0) ((= n 1) 1) (else (+ (fib (- n 1)) (fib (- n 2))))))
(time (map fib (range 1 20)))
Take note that a function passed to map is a regular Scheme expression, while with tc-map it must be a string.

It is quite possible that you get no gain over sequential processing using less than 3 nodes running on 3 cores with this library. The main reason is that Terracotta introduces some overhead itself. The second one is that nodes poll Terracotta for job lists in 20ms intervals. Those intervals are necessary if you don't want to consume the whole CPU power just for loops and leave none for jobs. You can adjust them by changing the value of JSTC_EVAL_DELAY.

I did some tests and I must say that the results surprised me. On my home laptop (Core2 Duo T5450 1.66 Ghz, 2 cores, 2GB RAM) the results looked like this:
(time (map fib (range 1 25))) - 17234 msec
(time (tc-map (list "test1") "fib" (range 1 25))) - 29020 msec
(time (tc-map (list "test1" "test2") "fib" (range 1 25))) - 19620 msec
while on three servers (dual Xeon E5430 2.66 Ghz, 8 cores, 8GB RAM each):
(time (map fib (range 1 25))) - 12687 msec
(time (tc-map (list "test1") "fib" (range 1 25))) - 22502 msec
but:
(time (tc-map (list "test1" "test2") "fib" (range 1 25))) - 25256 msec
(time (tc-map (list "test1" "test2" "test3") "fib" (range 1 20))) - 22355 msec
when I ran the tests on a single machine, and:
(time (tc-map (list "test1" "test2") "fib" (range 1 25))) - 14216 msec
(time (tc-map (list "test1" "test2" "test3") "fib" (range 1 20))) - 11538 msec
when each node was on a different machine.
On my laptop I got almost 150% speedup by using two Terracotta nodes instead of one, but on a server machine two nodes actually slowed the tasks down. I could get faster job processing only by spreading nodes across different machines. Adding new nodes to the machines seemed to have no impact on the results, so I couldn't get past 250% speedup factor.
Weird, isn't it?

Monday, October 27, 2008

Clustering Kawa with Terracotta

In my recent post I described a method of building a cluster of JScheme instances using Terracotta object management system. I also prepared a small library for JScheme that makes using Terracotta in JScheme easy - it is LGPL licensed and free for download. Today I tried the same trick with Kawa, which is not only a full-featured Scheme implementation in Java, but also a complete framework for implementing other programming languages to run on Java platform. Knowing how to cluster JScheme, making Kawa work with Terracotta was a piece of cake.
First you start a Terracotta server. Then you run a Kawa shell through the following command:
java -Xbootclasspath/p:[terracotta boot jar] -Dtc.config=tc-config.xml -Dtc.install-root=[terracotta install dir] -jar kawa.jar
Just as with JScheme, boot jar is is a stub jar file that starts a Terracotta client before the main application and tc-config is a Terracotta configuration file (see the post about JScheme for details).
Next, you load the Terracotta library:
(load "kwtc.scm")
and define an object (for example an ArrayList) to share across the cluster:
(define ob (create-root "list" (make <java.util.ArrayList>)))
Now you can put values on the list:
(sync-write (lambda () (ob:add 1)) ob)
and read them back synchronously:
(sync-read (lambda () (ob:get 0)) ob))
or simply with:
(ob:get 0)
The library for Kawa can be downloaded from here.
Happy coding!

Sunday, October 26, 2008

Clustering JScheme with Terracotta

In one of my previous posts I mentioned Terracotta, an open source clustering solution for Java. Its main advantage over traditional Java solutions in use (like RMI) is that it works as middleware between a JVM and an application, enabling transparent sharing of Java objects across the network. The only requirement for a programmer is to execute operations on shared objects in synchornized context, which allows you to build distributed applications or refactor already existing ones quickly.
Inspired by Jonas Bonér's experiments with JRuby I decided to give it a try with JScheme, an open source Scheme implementation running on JVM. I chose JScheme over other Scheme implementations (like Kawa or SISC) because of its very simple, clear and elegant interface to Java objects. In fact, since Terracotta operates on the JVM level, you can use it to cluster any application written in any language, as long as it compiles to Java bytecode and provides an interface to communicate with Java objects and classes.
First, you need to download and install Terracotta (current stable version is 2.7.0). Then, you have to start a server with start-tc-server.sh script. In Linux, if you encounter any strange errors running any of the scripts in the bin directory of your Terracotta installation, change the header of a problematic script from #!/bin/sh to #!/bin/bash - this should help to solve the problem. The server manages all the shared objects in Terracotta cluster and needs to be started before any client applications are run.
Next, you need to prepare a client configuration in the form of an XML file. I used a configuration provided by Jonas and stored it in tc-config.xml:
<?xml version="1.0" encoding="UTF-8"?>
<tc:tc-config xmlns:tc="http://www.terracotta.org/config">
  <servers>
    <server name="localhost"/>
  </servers>
  <clients>
    <logs>%(user.home)/terracotta/jtable/client-logs</logs>
    <statistics>%(user.home)/terracotta/jtable/client-logs</statistics>
  </clients>
  <application>
    <dso>
      <instrumented-classes>
        <include>
          <class-expression>*..*</class-expression>
        </include>
      </instrumented-classes>
    </dso>
  </application>
</tc:tc-config>
Now you can start JScheme interpreter hosted by Terracotta:
java -Xbootclasspath/p:[terracotta boot jar] -Dtc.config=tc-config.xml -Dtc.install-root=[terracotta install dir] -jar jscheme.jar
Boot jar is a stub jar file that starts a Terracotta client and connects to the server before the main application is started. You should be able to find it in lib/dso-boot folder of your Terracotta installation directory. If it isn't there, you can generate it with make-boot-jar.sh script found in Terracotta bin folder.
Now when the whole working environment has been set up, you can use com.tc.object.bytecode.ManagerUtil class to create a shared root object. Unfortunately Jonas's method which uses LOCK_TYPE_WRITE static field of com.tc.object.bytecode.Manager class to perform a write lock during this operation fails to work. It causes some strange error about missing com.tc.object.event.DmiManager class, which seems to be a problem even with Jonas's JRuby example itself. A quick solution to this problem is to define locks in a way they are defined in com.tc.object.lockmanager.api.LockLevel class:
(define TC_READ_LOCK 1)
(define TC_WRITE_LOCK 2)
Now let's define a sample object to share. It can be, for example, an ArrayList:
(define l (ArrayList.))
Next you need to create a shared root object named "list" that will hold the l object:
(import "com.tc.object.bytecode.ManagerUtil")
(ManagerUtil.beginLock "list" TC_WRITE_LOCK)
(define ob (ManagerUtil.lookupOrCreateRoot "list" l))
(ManagerUtil.commitLock "list")
Terracotta provides a great debugging tool called Terracotta Administrator Console to analyze the objects held by the server. Start the console by running admin.sh script found in bin folder of the Terracotta installation directory, then connect to localhost and go to Cluster object browser. You should see an empty ArrayList on the object list.
Now let's add a new value to the shared object:
(ManagerUtil.monitorEnter ob TC_WRITE_LOCK)
(.add ob 1)
(ManagerUtil.monitorExit ob)
Go back to Terracotta Administrator Console, select the shared ArrayList and press F5 to refresh the view. You should see that now it holds a single value: 1.
Now start another scheme shell instance and try to read the first list value on the shared list:
(define TC_READ_LOCK 1)
(define TC_WRITE_LOCK 2)
(import "com.tc.object.bytecode.ManagerUtil")
(ManagerUtil.beginLock "list" TC_WRITE_LOCK)
(define ob (ManagerUtil.lookupOrCreateRoot "list" (ArrayList.)))
(ManagerUtil.commitLock "list")
(.get ob 0)
The return value is 1. Sweet!
I wrote a small library for JScheme that allows you to perform the basic operations of creating shared root objects in Terracotta and modifying them with read and write locks. You can download it from this location.
To do the list operations described above you can simply do:
(load "jstc.scm")
(define ob (create-root "list" (ArrayList.)))
(sync-write (lambda () (.add ob 1)) ob)
in one shell and then read the list value in another shell:
(load "jstc.scm")
(define ob (create-root "list" (ArrayList.)))
(.get ob 0)
Have fun!

Thursday, October 23, 2008

Erlang tips and tricks: records

The most awkward thing I ran into while programming in Erlang are things called records. I call them "things" since they are neither Erlang primitives nor data structures. In fact, records are not even understood by Erlang VM itslef (try to define a record in interactive shell). They are some kind of macros (they remind me the ones used in ANSI C) that are translated by the compiler into Erlang native data structures.
The biggest problem with records is that you cannot dynamically modify them. For example, suppose you want to create a table in Mnesia, but you don't know at the moment how many columns your table will have. With records, you have to provide all record declarations for all tables you plan to generate on compilation time, which sometimes is simply impossible.
I did some research on the topic while writing Mnapi, and I must say it wasn't that easy if you don't know where to start. Finally, I managed to decipher Erlang records secrets mostly by analyzing Mnesia source code. One more reason for using open source software, by the way :-)
It turns out that records are in fact wrappers for tuples. So a record defined as:
-record(book, {isbn, author, title}).
gets translated internally into:
{book, isbn, author, title}.
Thus, an equivalent of:
mnesia:write(#book{isbn = I, author = A, title = T}).
is:
mnesia:write({book, I, A, T}).
The only advantage of such crippled structure is that the compiler knows the positions of all elements in the record, so it can always put them in a valid tuple in correct order and find possible errors instantly at the compilation stage. No matter if you write:
#book{isbn = I, author = A, title = T}.
#book{author = A, isbn = I, title = T}.
#book{title = T, author = A, isbn = I}.
the #book record will eventually end up in Erlang as:
{book, I, A, T}.
Records try to make accessing data easier. To get an author from the tuple above you would have to use element function:
X = element(3, {book, I, A, T}).
which can become problematic if you shift fields in the tuple. With record you can do the same simply with:
X = #book.author.
no matter what the order of the record is. But it's still cumbersome and static.

Fortunately, Erlang allows you to build dictionaries, which can be used to store Key - Value pairs dynamically. For example:
D = dict:new(),
D1 = dict:store(isbn, 1234, D),
D2 = dict:store(author, myAuthor, D1),
D3 = dict:store(title, myTitle, D2),
Book = dict:store(book, D3, D3),
dict:find(author, Book).
Personally, I don't use records, unless I really have to. I use dictionaries instead, they are much more powerful and elegant. And you can evaluate them on runtime.

Mnapi 1.2

I tweaked a bit my Java/PHP API for Mnesia by adding memory caching for auxiliary table structures used by the API. My tests have shown that it speeds up fetching single rows for tables where data are kept in disc_only_copies by about 2%. Much less than I expected, I must say. But since the changes are really little and work seamlessly with data created with Mnapi 1.1 and earlier, I decided to leave them in the code. If anyone is interested in the new version, here it is.

Sunday, October 5, 2008

Mnapi 1.1

Today I released a new version of my Mnesia API for Java and PHP. The main change is merging main code trunk with the Tokyo Cabinet branch and adding the ability to select storage type when creating a table. You can now choose one of the following:
* ram - for RAM only based storage,
* disc - for disc based storage with RAM copy of the table for improved performance (this is the default),
* disc_only - for disc only based storage (much slower, but uses a lot less memory),
* tokyo - for use with Tokyo Cabinet storage engine.
The API does not support table fragmentation and as for now I don't plan to introduce this feature (you can always use tokyo as a storage for data exceeding 2GB limit). But, since the library code is licensed under LGPL, you are free to extend it or modify it if you lack any feature.
You can get Mnapi 1.1 here.
Happy coding!

Monday, September 29, 2008

LFE: Lisp Flavoured Erlang

Today Robert Virding released a new version of his Lisp syntax front-end to the Erlang compiler, which he called LFE (Lisp Flavoured Erlang). I saw the previous versions of his work, and I must say that it is the first version that looks really mature to me now.

LFE is not an implementation of Lisp or even Scheme. Its syntax reflects some specific constructs and limitations of Erlang, since it compiles programs to Erlang bytecode and runs them directly on Erlang VM.

Is a new trend for Erlang coming on?

Saturday, September 27, 2008

Cilk: C that scales

Do you like C? I do. I used to write software for embedded devices, where resources are priceless and each CPU cycle and every byte of RAM counts. I enjoyed it a lot, mostly because it was a real challenge and it reminded me of good old 8-bit computer times, when if your program was not fast enough you had to optimize its code or find a smarter solution instead of buying a better CPU, expanding memory, or building a home data center. Even today few people question using C to speed up the most critical parts of software, not to mention some obvious examples like Linux kernel, written almost entirely in ANSI C. Two years ago NVidia announced its CUDA platform, which allows developers to write software in C that can run concurrently on many GPUs, outperforming applications using standard CPUs.

For those who don't own NVidia hardware, there is an alternative called Cilk. Cilk is a language for multithreaded parallel programming based on ANSI C. It extends standard C with new keywords like cilk (identifying parallel function), spawn (which spawns a new computation thread) and sync (which gathers job results from all threads started with spawn).

I ran an example Cilk application that finds Fibonacci numbers of a given value:
#include <cilk-lib.cilkh>
#include <stdlib.h>
#include <stdio.h>

cilk int fib(int n)
{
     if (n < 2)
          return (n);
     else {
          int x, y;
          x = spawn fib(n - 1);
          y = spawn fib(n - 2);
          sync;
          return (x + y);
     }
}

cilk int main(int argc, char *argv[])
{
     int n, result;

     if (argc != 2) {
          fprintf(stderr, "Usage: fib [] <n>\n");
          Cilk_exit(1);
     }
     n = atoi(argv[1]);
     result = spawn fib(n);
     sync;

     printf("Result: %d\n", result);
     return 0;
}
As you can see it looks almost like an ordinary C program. You can run it with --nproc argument to determine the number of CPUs (or CPU cores) that are supposed to be used by the application.

As I expected, the Cilk program turned out to be faster than its Erlang equivalent. The speedup was from 5.110972 to 0.689607 seconds on dual Xeon E5430 2.66 Ghz (8 cores total), comparing to 25.635218 and 10.017358 seconds of Erlang version respectively. But what struck me the most about the results was that the Cilk code scaled in almost linear manner:
Wall-clock running time on 1 processor: 5.110972 s
Wall-clock running time on 2 processors: 2.696646 s
Wall-clock running time on 4 processors: 1.355353 s
Wall-clock running time on 8 processors: 689.607000 ms
Cilk code performance boost was approximately 740%, comparing to Erlang 250%. On 8 cores Cilk code outperformed Erlang code by more than an order of magnitude. This is something that should make Erlang developers feel respect for.

Friday, September 26, 2008

Living in a concurrent world

Erlang is a great platform for writing concurrent, distributed, fault-tolerant applications. But it's not the only one. Some of the popular programming languages also have tools that help developers build concurrent, highly scalable web services.

Java

One of the main sources of Java power is that it has libraries to do almost anything you can imagine. And there are of course libraries that offer various solutions to the scaling out problem, like JMS with JCache or Scala Actors library.
But the real killer application in my opinion is Terracotta. It's an open source clustering solution for Java that works beneath the application level. It uses transactional memory to share objects between applications running on different JVM instances, most notably on different physical machines. Object pool is stored on a master server, which can have multiple passive backups. As a result you receive a clustering solution transparent for the applications you write (you only need to define objects you want to share in Terracotta configuration files). What's more, you can use Terracotta to cluster applications created in languages other than Java, but running on JVM - like JRuby or Scala.

Python

Python programmers can use Disco. It's an open source map-reduce framework written in Erlang that allows you to scale your application easily. Unfortunately, it does not allow neither sharing any data between different application instances (like Terracotta), nor passing messages (like Erlang itself).

Ruby

Ruby looks much better than Python in this aspect. First, there is JRuby, which you can cluster with Terracotta. Second, there is soon-to-come MagLev by Gemstone. It's a Ruby implementation running on a Smalltalk virtual machine, and the first public presentation of MagLev at RailsConf 2008 was quite impressive.

PHP

Not much to say here... You cannot expect much from a language created to produce dynamic HTML pages. You can only rely on external libraries like memcached to store and retrieve objects, but it's by no means an extension to the language itself.

If you know of any other interesting projects providing transparent (or almost transparent) concurrency and distribution to already existing programming languages, you are welcome to tell me about it!

Scripting Erlang

Today I came across an interesting post about new scripting language Reia for Erlang virtual machine. It seems that some people within the Erlang community started to realize what .NET developers always knew and Java programmers understood a few years ago: that you have to distinguish between the language and the runtime. People who were tired with Java limitations or just wanted to introduce their own ideas started to develop things like JRuby, Jython, Groovy, Scala or Clojure, just to name a few. Erlang has already developed a strong position as middleware. Now it's time to make it scriptable.

You can read more about Reia on this blog. It discusses Reia design principles and goals, as well as some controversial ideas, like introducing classes and objects or multiple variable assignment. It is true that it goes up the stream against Erlang philosophy, but I think that there is nothing inherently stupid about any idea until you can prove it wrong. This is why I don't agree with Joe Armstrong's thesis that Object Oriented programming sucks. There are programming languages that succesfully combine OO and functional paradigms (OCaml, Scala, CLOS), which prove just the opposite. It would be nice to see a modern object system in Erlang, and ALGOL based syntax could convince more people to use this excellent platform. Or, at least, possibly alleviate some already exising Erlang pains.

Mnapi for Tokyocabinet

Finally, I found some time to patch Mnapi to work with tcerl. Now you can use it to build your web service with this highly promising Mnesia storage backend.

You can download the new Mnapi here.

Due to the nature of Tokyocabinet, remember to always stop Mnapi via
mnapi:stop().
before closing or shooting Erlang shell, so it could gracefully flush all RAM buffered data to disk before stopping Mnesia. Yeah, I know I could use gen_server behaviour to handle it, but I just cannot convince myself to Erlang behaviourism ;-)

Sunday, September 21, 2008

Sky is the limit for Mnesia

Until recently the biggest Mnesia flaw was its storage limit. It is not the fault of the database system, since Mnesia itself can handle data of virtually infinite size. The main problem lays in an outdated Erlang term storage engine called DETS, which is slow and uses 32-bit offsets (it limits single file size to 2GB). DETS could be a perfect fit for the famous AXD301 ATM switch, but is certainly not for modern web development. A few times I had to drop Mnesia in favour of Hbase to serve data and use Jinterface to make it communicate with Erlang code, which served application logic.

But those times are finally over. Thanks to Joel Reymont and the Dukes of Erl you can now use Tokyocabinet engine as a storage for Mnesia. What is the most exciting about this solution is that it is completely transparent to your application - only creating a table looks a bit different:
Table = testtab,
mnesia:create_table(Table,
    [{type, {external, ordered_set, tcbdbtab}},
     {external_copies, [node()]},
     {user_properties, [{deflate, true}, {bucket_array_size, 10000}]}]).
You also need to start tcerl before running Mnesia:
tcerl:start().
and synchronize table data with disk before closing Erlang, if you don't want to loose some of your data:
Port = mnesia_lib:val({Table, tcbdb_port}),
tcbdbets:sync(Port).
To sync all existing Mnesia tables you can use the following function:
F = fun(T) ->
            case catch mnesia_lib:val({T, tcbdb_port}) of
                {'EXIT', _} ->
                    skip;
                Port ->
                    tcbdbets:sync(Port)
            end
    end,
Tabs = mnesia_lib:val({schema, tables}),
lists:foreach(F, Tabs).
All other common operations, like reading and writing data, transactions, etc., don't need to be changed. You can learn more by looking at example provided with the library.

I managed to make ejabberd (one of the best Jabber/XMPP servers around) to run on Tokyocabinet, only with changing a few lines of its code. Now I'm on my way to make Mnapi work with it. Thank you guys!!!

Thursday, August 21, 2008

Clojure is here

Clojure is a Lisp family, dynamic, functional programming language targeting the Java Virtual Machine, with excellent support for concurrent programming. Rich Hickey, Clojure author, prepared a few impressive presentations about the language, which are definitely worth watching. They are not only about Clojure itself, but also give a deep insight into computer language design and concurrent programming in general.

There have been some attempts to compare Clojure with Erlang, although both of the languages address different problem classes. Erlang has strong telecommunication background - starting with the language name itself. It features its own, highly efficient virtual machine, which allows spawning hundreds of thousands of processes and soft real-time performance. Most of its syntax comes from Prolog, since initially Erlang emerged as its dialect - everyone who programmed in Prolog before (like me) will feel at home with Erlang. Finally, Erlang supports programming model based on transparent distribution, which means that you can spawn processes throughout one or several machines without making any changes to the software. Processes (also called actors) share no common memory or data and communicate purely through message passing.

Clojure does not provide its own virtual machine with ability to handle millions of soft realtime processes. Instead, it compiles directly to JVM bytecode, which immediately gives you access to all Java classes and to thousands of libraries written for the Java platform. Clojure is based on Lisp, which is much more popular for programming real-life applications than Prolog. Therefore, despite of its (Lots (of Irritating (and Superfluous (Parentheses)))) syntax, Clojure may be faster to learn and adapt by regular software developers than Erlang. As regards concurrency, Clojure does not support distributed programming. Instead, it uses Software Transactional Memory model. Clojure agents, unlike Erlang actors, use actions instead of message passing to change their state. Actions can be committed inside transactions, which allows for example to read the state of all agents within one transaction, and this way obtain a consistent snapshot of a whole system (pretty tough to achieve in distributed enviornment).

Erlang has a 20-year success story in telecommunication industry and is especially suitable for building highly scalable, fault tolerant, distributed application clusters that are expected to work under very heavy load. It is also ready to work on multiprocessor machines out of the box. However, when finally multi-core CPUs became standard in home computers, Erlang did not sweep away other anachronic programming languages, despite its obvious advantages. I think it is because most home applications don't need many of the features Erlang has to offer. Users don't need 99,98% uptime, as they usually turn off their machines overnight. They don't need soft real-time processing, since they can easily tolerate a few second delay in data processing. Finally, they don't need high scalability, as very few of them are geeks who connect their machines in clusters (who needs it anyway, if you have more and more cores in one box?). As Rich Hickey, Clojure author, said in his discussion on STM:

"Scalability is not a universal problem - some systems need to handle thousands of simultaneous connections - most don't, some need to leverage hundreds of CPUs in a single application - most don't."

Clojure seems to meet the needs of desktop programmers slightly better than Erlang. It provides easy concurrency and all qualities of functional programming, allows you to take advantage of vast set of Java libraries, and lets you to run your software without modification on every platform for which the Java runtime exists. Erlang is unbeatable on server platform, but leaves a lot of space for its desktop counterparts. Clojure is just the first of them. I am sure there will be more.

Thursday, August 14, 2008

MapReduce in Erlang

MapReduce is a Java framework for parallel computations, designed and developed by Google. It is often referred to not only as a platform, but also more general as an algorithm (or a "programming model", as Google calls it itself). The main idea behind MapReduce is to spawn computations on a set of data to many single processes (map), and then gather their results (reduce).

Implementing MapReduce in Erlang is unbelievably trivial. Joe Armstrong, Erlang inventor and main architect, published a sample pmap/2 function, which can be used as a parallel equivalent of standard Erlang lists:map/2:
pmap(F, L) ->
    S = self(),
    Pids = lists:map(fun(I) ->
                         spawn(fun() -> do_f(S, F, I) end)
                     end, L),
    gather(Pids).

gather([H|T]) ->
    receive
        {H, Ret} -> [Ret|gather(T)]
    end;
gather([]) ->
    [].

do_f(Parent, F, I) ->
    Parent ! {self(), (catch F(I))}.
Joe didn't provide any particular examples of using pmap, so I wrote a test function that makes a sequence of Fibonacci numbers. First, let's start an Erlang shell with -smp flag, that enables it to use many CPUs and many CPU cores:
erl -smp
Now define a Fibonacci function:
fib(0) -> 0;
fib(1) -> 1;
fib(N) when N > 0 -> fib(N-1) + fib(N-2).
and generate a test list as a sequence of numbers (for example from 0 to 40):
L = lists:seq(0,40).
To make every element of the list L to be processed by fib/1 function we use lists:map/2:
lists:map(fun(X) -> fib(X) end, L).
To do a parallel computation we need to use pmap/2 instead of lists:map/2:
pmap(fun(X) -> fib(X) end, L).
On my laptop with Core2 Duo T5450 1.66 Ghz running the parallel version of fib/1 function reduced execution time from 53.405534 to 31.354125 seconds (as measured by timer:tc/3). On dual Xeon E5430 2.66 Ghz (8 cores total) the speedup was from 25.635218 seconds to 10.017358. It is less than Joe Armstrong managed to achieve (he probably used more sophisticated test functions and different hardware than a regular PC), but it clearly shows how easy it is to parallelize an Erlang program - just replace every instance of map/2 with pmap/2 and you're done.

I prepared a modified version of pmap/2, which spawns processes not only on local machine, but also on all nodes in a cluster:
pmap(F, L) ->
    S = self(),
    Nod = [node()|nodes()],
    {Pids, _} = lists:mapfoldl(
        fun(I, {N1, N2}) ->
            case N1 == [] of
                true -> N = N2;
                false -> N = N1
            end,
            [H|T] = N,
            {spawn(H, fun() -> do_f(S, F, I) end), {T, N2}}
        end, {Nod, Nod}, L),
    gather(Pids).
Using the modified pmap/2, I got similar results with a cluster of two Erlang shells running on a single Core2 Duo machine as with Joe's function on a single shell started in smp mode. This version of pmap/2 also allows you to rebalance computations by starting as many Erlang shells as there are CPUs (or CPU cores) on multiple machines, and joining them in one cluster.

Monday, August 11, 2008

Scheme & Termite: Erlang alternative?

Gambit-C is a renowned Scheme interpreter and compiler, which can generate fast and portable C code. It features Termite library written in Scheme, which implements Erlang-like concurrency model for distributed programming. Let's see how it works.

First, create two nodes on two separate machines. Suppose machine one IP address is 192.168.1.101 while machine two is 192.168.1.102. Start a Scheme REPL on the first machine through tsi script shipped with Termite (it enhances standard Gambit gsi shell with Termite functions) and init it as node1:
(node-init (make-node "192.168.1.101" 4321))
Now start tsi on the second machine and init it as node2:
(node-init (make-node "192.168.1.102" 4321))
Stay at node2 shell and define an expression to identify node1:
(define node1 (make-node "192.168.1.101" 4321))
Now you can remotely spawn a process on node1:
(define p (remote-spawn node1
  (lambda ()
    (let loop ()
      (let ((x (?)))
        (! (car x) ((cdr x)))
        (loop))))))
The code above remotely spawns procedure p on node1, which waits for a signal in an infinite loop. When it receives signal x as a pair of sender and expression to evaluate, it evaluates the expression, sends its result back to the sender, and loops back. To see it in action evaluate the following expression in node1 REPL:
(! p (cons (self) host-name))
It causes command host-name to be sent to process p on node1. Now you can display received result with:
(?)
Erlang equivalent would look like this:
-module(p).

-export([start/0,loop/0]).

start() ->
    register(p, spawn(node1, p, loop, [])).

loop() ->
    receive
        {Pid, F} ->
            Pid ! (catch F()),
            loop()
    end.
You would call it with:
{p, node1} ! {self(), fun() -> N = node(), N end}.
and read its result with:
receive X -> X end.
So far so good. Now let's halt the Termite shell on node1:
,q
And try to invoke procedure p from node2 again:
(! p (cons (self) host-name))
The shell will segfault. Oops...

This is not the only problem with Termite. Gambit-C comes with gsc, a Scheme to C compiler, which I used to compile the sample script above. First, I had to go to /usr/local/gambc/current/lib/termite and compile Termite itself:
gsc termite
Then I prepared a sample file remote.scm:
(##include "~~/lib/gambit#.scm")
(##include "~~/lib/termite/termite#.scm")

(node-init (make-node "192.168.1.102" 4321))
(define node1 (make-node "192.168.1.101" 4322))

(define p (remote-spawn node1
  (lambda ()
    (let loop ()
      (let ((x (?)))
        (print (cdr x))
        (! (car x) ((cdr x)))
        (loop))))))

(! p (cons (self) host-name))
(print (?))
(newline)
And compiled it to executable binary code:
gsc -c remote.scm

gsc -link /usr/local/gambc/current/lib/termite/termite.c remote.c

gcc -o remote /usr/local/gambc/current/lib/termite/termite.c remote.c remote_.c \
    -lgambc -lm -ldl -lutil -I/usr/local/gambc/current/include -L/usr/local/gambc/current/lib
Now I started node1 on 192.168.1.101 again:
(node-init (make-node "192.168.1.101" 4321))
and tried to run remote binary code from 192.168.1.102:
./remote
Kaboom! Another segfault... Seems like compiled binary cannot handle message passing correctly, although it works in tsi interpreter.

I tried to create a bit more sophisticated Gambit-C/Termite application, but unfortunately I failed to find more useful documentation on Termite than its initial paper. There are also very few examples on Termite, and the most interesting I found come from Dominique Boucher's blog (my test script is based on those examples).

On the other hand, a great advantage of Gambit-C over Erlang is that it can translate Scheme applications to plain C, which than can be compiled to standalone executables using gcc. Thanks to this feature I managed to compile a few Gambit examples with OpenWRT-SDK and ran it on my home Linksys WRT54GL router with OpenWRT firmware - the only thing I had to do was to replace gcc command with mipsel-linux-gcc while compiling gsc output files.

To sum up, Termite is a very interesting and ambitious project, and its authors deserve applause for their job. Unfortunatelly, it seems that it is still in its experimental stage, and is not ready for production use yet. However, when finally done, it can be very useful Erlang alternative (thanks to the Gambit-C compiler) for programming embedded devices and systems, where Erlang VM does not exist. I think this post is also a good comment on Erlang and Termite.

Thursday, August 7, 2008

Web services need balance

So we already have a nice and fast web service based on PHP and Yaws or not so fast, but still nice, service powered by Java and Tomcat. We also have a powerful Mnesia storage back-end we can communicate with through a dedicated api. Now we are ready to scale.

There's a lot of ways for balancing a web cluster. You can use one of many software load balancers, configure a front-end web server as a reverse proxy or even use a dedicated hardware load balancer. But hey, we have Erlang, so why bother with complicated software or spend money for expensive hardware? Writing a simple load balancer in Erlang took me about an hour, including testing, and I am not a very experienced Erlang programmer. This can give you some picture of how functional programming can improve your performance as a developer.

What the balancer actually does is checking the system load on all nodes in a cluster and returning a name of the least loaded one. The software is GPL licensed and can be downloaded from here. You should compile it with:
erlc balancer.erl
and deploy it to all machines you want to monitor. Now you can check the system load of the current node:
balancer:load().
all of the nodes you are connected to:
balancer:show(nodes()).
all of the nodes including current node:
balancer:show([node()|nodes()]).
pick the least loaded one with:
balancer:pick(nodes()).
or with:
balancer:pick([node()|nodes()]).
Due to Erlang nature, dead nodes are instantly removed from the nodes() list, so they are not queried. Additionally, the balancer filters out all nodes that returned an error, so you always get valid results, with no timeouts, deadlocks, etc. However, the result only says that a machine is up, so if a web server dies for some reason, and the machine itself did not crash, the node will still appear on the list (which is quite obvious).

Since the balancer is written in Erlang, it seems natural to deploy Yaws as a front-end web server and use it to redirect users to the servers that are least loaded at the moment. Suppose we have each web server running on different sub-domain (s1.host.domain, s2.host.domain, etc.), and each of them runs a single Erlang node. In this case our index.yaws on http://host.domain/ can look like this:
<html>
<erl>
out(Arg) ->
    {Node, _} = balancer:pick(nodes()),
    [_|[Host|_]] = string:tokens(atom_to_list(Node), "@"),
    Url = string:concat("http://", Host),
    {redirect, Url}.
</erl>
</html>
Connecting to http://host.domain/ will now automatically redirect you to the fastest server. It is your choice if you want to keep users bound to the back-end server or refactor links on your web site to make them go through the redirector each time they click on a link. In the latter case, if you provide dynamic content, you can use Mnesia to store user sessions so they would not get logged out if they suddenly switch from one back-end server to another.

Mnapi: Mnesia API for Java and PHP

Mnesia is a scalable database system designed to work as a storage system for large, distributed applications. In my previous post I showed you how to set up a Mnesia cluster using Erlang shell. Unfortunately, Mnesia cannot be easily accessed from languages other than Erlang. If you are a web developer, there is even an MVC framework for you - Erlyweb. But what if you are a mainstream programmer who would like to use the power of Mnesia, but is not willing to learn another language and change the whole way of thinking in order to understand the functional programming philosophy?

This is why I created Mnapi - API for Java and PHP, which provides basic methods to create, fetch, update and delete records in Mnesia. It is licensed under LGPL, which means that you can get it and use it in any commercial or non-commercial applications, and you are only obliged to publish only the source code of Mnapi if you extend it or modify it (the license does not affect an application which makes a use of it). You can download Mnapi here.

Please bear in mind that Mnesia does not support creating multiple databases or schemas within one instance, joining tables, etc. - and the API reflects those restrictions. You can learn more about it reading documentation attached to the source code and running example applications, so I will not be describing it here in detail. What you need to start using Mnapi is to compile Erlang driver for Mnesia:
erlc mnapi.erl

then start an Erlang shell:
erl -sname erl -setcookie mysecretcookie

and run Mnapi server from the shell:
mnapi:start().

Now you can use Java or PHP to talk to Mnesia through Mnapi libraries.

Note that to use Mnapi from PHP you first need to install and configure Erlang extension for PHP. Read instructions provided with the extension to learn how to configure you PHP installation.

To compile Java api you need Jinterface. If you cannot find Jinterface in lib folder of your standard Erlang installation, you can either download Erlang source code distribution or install it as a package from CEAN.

Have fun!

Erlang tips and tricks: Mnesia

Mnesia is one of Erlang killer applications. It's a distributed, fault tolerant, scalable, soft real-time database system which makes building data clusters really easy. It is not designed as a fully blown relational database, and it should not be treated as its replacement. Although Mnesia supports transactions (in a much more powerful way than traditional databases, as any Erlang function can be a transaction), it does not support SQL or any of its subsets. Mnesia is best suitable for a distributed, easily scalable data storage, which can be shared by a large cluster of servers. One of the most notably known examples are ejabberd - a famous distributed jabber/xmpp server - and cacherl - a distributed data caching system compatible with memcached interface.

Setting up a cluster of Mnesia nodes is easy. First you need to set up a cluster of Erlang nodes as I described in my post Erlang tips and tricks: nodes. Start Mnesia instances on all of them:
mnesia:start().
Now choose one of the nodes as initial Mnesia server node. It can be any node, as Mnesia works in peer-to-peer model and does not use any central management point. Go to a chosen Erlang shell and move the database schema to disc to make it persistent:
mnesia:change_table_copy_type(schema, node(), disc_copies).
You can now create a new table, for example:
mnesia:create_table(test, [{attributes, [key, value]}, {disc_copies, [node()]}]).
which creates table test with two columns: key and value. Attribute disc_copies means that a table will be held in RAM, but its copy will be also stored on disc. If you don't store any persistent data in your table, you can use ram_copies attribute for better performance. On the other hand, if you want to spare some of your system memory, you can use disc_only_copies attribute, but at the cost of reduced performance.
Now let's add a second node to Mnesia:
mnesia:change_config(extra_db_nodes, [node2@host.domain]).
Of course replace node2@host.domain with the name of a node you want to add (or a list of nodes: [node2@host.domain, node3@host.domain, ...]).
Now switch to Erlang shell at the node you have just added and move its schema to disc, so Mnesia remembers it's a part of the cluster:
mnesia:change_table_copy_type(schema, node(), disc_copies).
You can now create a copy of table test on this node:
mnesia:add_table_copy(test, node(), disc_copies).
Now you can add a third node, a fourth node, etc. in the same way as you added the second one. When you have many nodes in the cluster you can keep tables as disc_copies only on some nodes as backup, and use ram__copies on other nodes to improve their performance. It generally makes sense to keep tables on disc only on one of the nodes per single machine.

To remove node@host.domain from the cluster stop Mnesia on that node:
mnesia:stop().
Now switch to any other node in the cluster and do:
mnesia:del_table_copy(schema, node@host.domain).
Mnesia is strongly fault-tolerant, which means that generally you don't need to worry when one of your nodes crashes. Just restart it and reconnect it to the cluster - Mnesia node will synchronize itself and fix all broken and out-of-date tables. I really like to imagine Mnesia as a database equivalent of T-1000, which even heavily damaged or broken into pieces, every time reassembles itself to its original form.

Unfortunately Mnesia has its limits. A storage limit seems to be the most troublesome of them.

Tuesday, August 5, 2008

Erlang tips and tricks: interactive shell

In my previous post I described issues you can run into when configuring an Erlang cluster. Now let's move a step further.

One of the most impressive Erlang features is its unique ability to do a hot code swapping. It means you can exchange the code of a running system without stopping it! Pretty neat, huh? Let's have a look at a very simple server, that just loops and displays its status when asked:
-module(test).
-export([start/0, loop/0]).
start() ->
register(test, spawn(test, loop, [])).
loop() ->
receive
status ->
io:format("~n Status OK ~n"),
loop();
reload ->
test:loop();
quit ->
ok;
_ ->
loop()
end.
The main loop waits for signals from clients and according to the signal does one of the following:
1) On status it displays a text "Status OK" and continues to loop.
2) On reload it hot swaps the server code.
3) On quit it quits with finishes with ok.
4) On any other signal it just loops.
After saving this code as test.erl and compiling with:
erlc test.erl
you can start a REPL shell and run the server with:
test:start().
The start() function spawns a new Erlang process and registers it on the currently running node, so you can call it through a name instead of through its process ID like this:
test ! status.
If you play around with Erlang you already know that. But how about calling test from another node? When you connect another node to the cluster and try to call it you will get an error saying that test is not a registered name. The answer is simple, but unfortunately not so easy to find in Erlang documentation. You need to call it with a tuple (pair) containing both node name and process name:
{test, test1@host.domain} ! status.
So now comes the time for hot swapping. You can edit the source file, change "Status OK" to any other text of your choice and recompile - of course without leaving the interactive shell, since our server is supposed to have 100% uptime :-)
Afterwards you switch back to REPL, do
test ! reload.
than
test ! status.
and you see... no change whatsoever. Why? Because Erlang shell caches executable code. If you want to load the modified version of the code you just compiled into REPL, you need to tell it to do so with:
l(test).
Suppose you have more nodes in your cluster where the code exists and you want to refresh Erlang cache on all of them, you should use:
nl(test).
You can also use the latter command to distribute your code across the cluster, without the need of sending the compiled binary test.beam to all of them through FTP or SCP. Sweet, isn't it?

Monday, August 4, 2008

Erlang tips and tricks: nodes

The title might be a bit exaggerated, but if you have just started your adventure with Erlang I would like to provide you with a couple of hints that can spare you a serious headache.

The basic tool to work with Erlang is its REPL shell, started in terminal mode with erl command. The name REPL comes from read, eval, print, loop cycle and among functional languages is a commonly used term for interactive shell. Since Erlang has been developed with distributed programming in mind, you can start as many shells as you like and make them communicate with each other. The basic rule you have to remember about is that you can interface only shells that share the same cookie. A cookie is simply a string that acts as a shared password to all nodes in a cluster, whether they run on the same computer or different machines across the network. You can set it either from the command line when starting REPL using -setcookie parameter or from the Erlang shell itself:
erlang:set_cookie(node(),mysecretcookie).
You can also edit .erlang.cookie file in your home directory so you don't have to set it up every time you start the shell. However, this method has some nasty side effects and that's why it is not recommended. The first one is that all Erlang applications you start from your user account, including all REPLs, will share the same cookie, which not always is a desired behaviour. Secondly, it is very likely that you forget to edit this file on another machine where you move your Erlang application to (or will not have enough permissions to do it), and your application will not work in environment other than yours.

So now when you have started your shells and set up a shared cookie you may want to check the connectivity between them. But first you need to know how to call them - now here's where the real fun begins. Erlang allows you to call a node (shell instance) with either a short or a long name ("-sname" and "-name" command line parameters respectively). A short name is a common name like "test" while a long name is a fully qualified domain name like "test@host.domain". Short names are shared across machines in the same domain, while FQDN names can be used (theoretically) across the whole Internet. So you start a short name REPL with:
/usr/bin/erl -sname test1 -setcookie mysecretcookie
So far so good. Now try another one within the same domain:
/usr/bin/erl -sname test2 -setcookie mysecretcookie
And now you want to ping test2 from test1 to check if everything is OK. So you input the following command in test1 REPL:
net_adm:ping(test2).
And you see "pang", which means that something went wrong. So you start to tear your hair out until you realize that test1 is not a node name in Erlang! Now go back to your REPL again and look carefully at the prompt. You will probably see something like:
(test1@localhost)1>
Now try to ping test2 again, but this time use a full name as displayed in REPL prompt:
net_adm:ping(test2@localhost).
And what you should now see is "pong", which means that the nodes can now see each other. Note that test2@localhost, although doesn't seem so is still a short name, not a fully qualified domain name (it lacks a domain part).

You can always see a list of all other hosts in the cluster after issuing command:
nodes().
in REPL. But remember about one important thing: new hosts are not seen by Erlang automatically. If you start a new Erlang instance and want it to show on the nodes() list, you have to ping one of the hosts already existing in the cluster. Information about a new node will be automatically propagated among all other nodes. To avoid it, you can use .hosts.erlang file in your home directory, which role is similar to the role of .erlang.cookie (including side effects) - it holds the list of all nodes which will be automatically informed about every new Erlang instance started on your user account, for example:
'test1@localhost'.
'test2@localhost'.
You need to have one empty line at the file end (look here for more information about the syntax).

So here are the basic things you should remember about when building an Erlang cluster:
1) Choose carefully between short and fully qualified domain names, since the first cannot communicate with the latter ones (to make it simple: you cannot mix short and long named nodes across one Erlang cluster).
2) When using more than one machine in your cluster, make sure all DNS records on all machines are set up properly to avoid communication problems.
3) Use the same cookie for all Erlang instances you want to put in a cluster.
4) When you start a new node always inform the cluster about it by pinging one of the running nodes.
5) Open port 4369 TCP/UDP on your firewall for all clustered machines - it is used by epmd daemon, which handles inter-nodes Erlang communication.

PHP Zend Framework on Yaws

Web developers looking for an efficient web server should definitely give a chance to Yaws, a lightweight and very fast open source web server developed in Erlang. For those who do some work in Erlang, this should be enough for a recommendation, since Erlang has been designed with high performance and scalability in mind. Those who haven't heard of Yaws and Erlang should have a look at this Yaws vs Apache comparison to get the idea.

Unfortunately, Yaws is not as mature as Apache in terms of available functionality. The one thing I missed particularly about it was the lack of easily definable rewrite rules, which would allow me to set up my favourite PHP framework (namely Zend Framework) to work under control of Yaws. Browsing through Yaws Wiki I found a post explaining how to use rewrite feature in Yaws. I used it as a starting point for my own rewriting module, which allowed me to run ZF on Yaws. I published it on Trapexit as a free code under no particular licence, beacuse - as I said - I didn't write it from scratch, but used a public domain work. Here's how it works (I assume you already have Erlang and Yaws installed in your system):

First you need to configure your MVC enviornment. I suggest following a manual at Akra’s DevNotes to create the basic directory structure. Suppose your Yaws document directory is /opt/yaws, you'll have to create folders /opt/yaws/application, /opt/yaws/library and /opt/yaws/public. Also create /opt/yaws/log folder for web server logs. Then create /opt/yaws/ebin and put there rewriter.erl. Compile the rewriter module from the command line with
erlc rewriter.erl
Create yaws configuration file yaws.conf and inform the web server to use php scripts and the rewrite module
php_exe_path = /usr/bin/php-cgi
ebin_dir = /opt/yaws/ebin
logdir = /opt/yaws/log

<server localhost>
port = 8080
listen = 0.0.0.0
docroot = /opt/yaws/public
allowed_scripts = yaws php
arg_rewrite_mod = rewriter
</server>
Finally, run your web server
yaws --conf yaws.conf
and go to http://localhost:8080 to see if it works.

From now on, all client requests to localhost will be redirected through a bootstrap file /opt/yaws/public/index.php. Make sure all paths in your PHP scripts lead to correct locations.

Tested on Erlang R11B with yaws 1.68 on Linux Mint 4.0 32-bit and yaws 1.73 on Ubuntu Server 8.04 64-bit with Zend Framework 1.5.

Good luck!