In a previous post, we explored Vert.x clustering with a toy key/value registry: we set up a cluster piece by piece, configured it and experimented its behaviour upon node failures. Vert.x actually offers, ways more than just what we’ve discussed in that post. Vert.x proposes an actor-like development model which allows to get most out of servers resources while making the application distributed and resilient. In this post we’ll explore that aspects and as an example, we’ll implement an alarm application.

Introduction

Distributed systems

Distribution is often related to notions like concurrency, high availability or horizontal scalability. Basically, a system is distributed if its processes run concurrently, that is independently one of each other, with some rendez-vous to exchange data. Because they are independent, these processes can run sequentially, interleaved, or in parallel at exactly same time, and thus, they can run on the same core of one computer, on different cores of the same computer, or on different computers. This last use case is very interesting because if the system has the proper architecture, adding new computers into the system should, without too much pain, increase computing power and thus allowing horizontal scalability and protecting the system against failure of one (or some more) computer.

So how do we practically design such a system? There are several ways, but one answer is the actor model introduced by Carl Hewitt in early 70’s. We’ll not get in depth into the definition of actors here, but basically, an actor is an object (do not strictly think in terms of OOP here) able

  • to send messages to other actors (and this is its only way to communicate)
  • to execute some code independently of other actors, including creation of other actors, upon reception of certain type of message and
  • to keep its internal state during its lifetime.

So, to answer the question we actually don’t want to implement all this machinery. What we want to do is to use a dedicated language or a set of libraries that provide all this infrastructure and that lets us concentrate on implementing the business logic.

One of the features of Vert.x is that it includes in its core the ability to define actor-like objects, the so called verticles. Verticles are scheduled by an event loop managed directly by Vert.x. As creating verticles is a cheap operation (compared to OS processes or Java threads) and as Vert.x itself is mainly asynchronous, this software infrastructure optimizes usage of all of hardware resources of modern multi-core servers. In addition, as verticles communicate one with each other across a bus which can spread across a cluster by exchanging messages, an application can sit on top of several Vert.x nodes that themselves can execute on several servers. This theoretically should insure high availability and horizontal scaling.

You’ll never forget to pay taxes anymore

It’s always easier to introduce new concepts with a close-to-real-life application that we can experiment with. Let’s develop a system that would allow to define alarms. Those alarms will be triggered at a given date and time and will print a predefined message. To keep it simple, the alarm application will have a bare HTTP API without any kind of security nor user interface other than curl.

We will first use it under normal conditions with a couple of Vert.x nodes. Then we’ll see what happens if the nodes start to fail. Eventually, we’ll overload the system and, again, see what happens and how we can cope with such situations.

Tooling and code iterations

As in the previous post, we’ll just use Maven and a text editor. The skeleton of the code has been generated with vep. Each code iteration will be siting on its own git branch, the skeleton being on master-00. The next ones will be master-01, master-02 and so on until master-final.

So, let’s start with the empty project:

$ git clone https://gitlab.com/mszmurlo/vx-alarm.git
## or git clone git@gitlab.com:mszmurlo/vx-alarm.git
$ cd vx-alarm
$ git co master-00

To make sure we have everything ready before going any further, let’s compile the skeleton with mvn package. The executable is target/vx-alarm-0.1-fat.jar that we can execute with java -jar target/vx-alarm-0.1-fat.jar and that can be queried from the command line with curl http://localhost:8080/hello. If everything goes as expected, we should get the reply Sending back parameter 'hello'.

The basic application

To begin with, we will first build a basic version of our system without any clustering or any fancy thing alike. We’ll go pretty fast on most aspects in this part as it is mostly the same as the registry application from the previous installment. We’ll spend some more time on verticles and the event bus as they are at the heart of our discussion.

Functional requirements

We’ll build an app that lets users set up alarms defined by a date, a time and a message. When time comes, alarms will be triggered and the message will be displayed in the logs. Alarms can be canceled and thus they must have some unique identifier. So, basically, we’ll define the set HTTP API that will receive a year, a month, a day, an hour, a minute and the message; it will “program” the alarm and will return a unique identifier so that the alarm can be manipulated in the future. We’ll also define the del API that will take an identifier as its parameter and will cancel the alarm with that identifier.

Here is the formal list of the API that we will implement along with their parameters:

  • /set/<year>/<month>/<day>/<hour>/<minute>/<msg>: defines the alarm with the message <msg> that will be scheduled for display on <year>/<month>/<day>/<hour>/<minute>, unless that time is already in the past. It will return a unique identifier, id.
  • /del/<id>: cancels the alarm identified by the identifier <id>, if it exists.

A typical session would be as follows:

$ date
samedi 18 avril 2020, 12:29:01 (UTC+0300)
$ curl http://localhost:8081/set/2020/04/18/09/30/WakeupWakeupWakeup
Ok. id: 41bdf04a-a5ad-4f08-837a-3bc1737f758c
$ curl http://localhost:8081/del/41bdf04a-a5ad-4f08-837a-3bc1737f758c
Ok: Alarm 41bdf04a-a5ad-4f08-837a-3bc1737f758c deleted successfully

Here we create an alarm to be fired at 09:30:00-GMT, that is 12:30:00-GMT+3 my local time. The systems returns Ok to tell the alarm had been created and an identifier in the form of an UUID. Then, We delete that alarm by using that identifier.

Now, if we create an alarm with curl http://localhost:8081/set/2020/04/18/09/33/WakeupWakeupWakeup and wait until it is scheduled, well see the following logs in the server console:

12:32:26.936 INFO  ...AlarmVerticle - Alarm 'WakeupWakeupWakeup' to be triggered in 34s 
12:32:26.936 INFO  ...Main - Alarm created successfully
12:33:00.970 INFO  ...AlarmVerticle - ALAAARM: WakeupWakeupWakeup

The logs reported above do not correspond exactly to the logs you’ll see on the screen: along this post I will strip logs down to make them easier to read.

Dummy skeleton

Let’s first prepare everything. We need a HTTP server with routes corresponding to the APIs. For the moment we’ll just implement dummy request handlers so that our system will be able to respond. We will also need later to start several Vert.x nodes on the same machine during the tests so each node needs to listen on its own port which will be provided as the command line parameter.

Below is shown the startup(...) method from Main.java that gets called after command line parsing (please refer to the previous post for more details):

  private void startup(int port) {
    VertxOptions opts = new VertxOptions()
      // add some options with setXXX() methods
      ;
    vertx = Vertx.vertx(opts);

    log.info("Starting HTTP server on port {}.", port);

    HttpServer server = vertx.createHttpServer();  // Create the server
    Router router = Router.router(vertx);          // Define the router
    router.get("/set/:y/:mo/:d/:h/:mn/:m").handler(this::setHandler); 
    router.get("/del/:id").handler(this::delHandler);
    // Any other method or URL will be handeled by 'defHandler' as an error
    router.route().handler(this::defHandler);

    server
      .requestHandler(router::accept)
      .listen(port, ar -> {
          if(ar.succeeded()) {
            log.info("Server running on port {}", port);
          }
          else {
            log.error("Could not start server on port {}", port);
            System.exit(-1);
          }
        });
  }

First, we start Vert.x. For the moment it’s a standalone instance but we’ll transform it into a clustered node later. Then, we define the routes that correspond to our APIs. And, eventually, we start the HTTP server.

For the moment the request handlers are just dummy methods that print Ok on the client side or Error if the HTTP method is not a GET or if the URL doesn’t match what we have specified in the router. All will have the same basic structure. Below is the example of the setHandler that will handle all /set/... requests:

  private void setHandler(RoutingContext rc) {
    String year = rc.request().getParam("y");
    String month = rc.request().getParam("mo");
    String day = rc.request().getParam("d");
    String hour = rc.request().getParam("h");
    String minute = rc.request().getParam("mn");
    String message = rc.request().getParam("m");
    log.info("Request /set/{}/{}/{}/{}/{} -> {}", year, month, day, hour, minute, message);
    sendReply(rc, 200, "Ok");
  }

First, we extract the parameters from the URI and then we do something with those parameters, here a simple log action. Eventually, we send back a response to the client.

Start the server in one terminal with java -jar target/vx-alarm-0.1-fat.jar 8080 and send some requests from another one. The session should look like this:

Client terminal

$ curl http://localhost:8080/set/2020/05/1/16/00/hello
Ok
$ curl http://localhost:8080/set/2020/05/1/16/00
Err                                                ## The message is missing
$ curl http://localhost:8080/del/123
Ok                                                 ## del with a dummy id '123'
$ curl -X POST http://localhost:8080/del/123
Err                                                ## Only GET are accepted
$ curl -X GET http://localhost:8080/del/123
Ok

Server terminal

07:59:53.544 INFO  ...Main - Request /set/2020/05/1/16/00 -> hello
07:59:58.338 WARN  ...Main - Request not allowed: 'GET:http://localhost:8080/set/2020/05/1/16/00'
08:00:09.969 INFO  ...Main - Request /del/123
08:00:17.976 WARN  ...Main - Request not allowed: 'POST:http://localhost:8080/del/123'
08:00:31.593 INFO  ...Main - Request /del/123

The application doesn’t do anything interesting so far but we have a working skeleton. The source for this step is on branch master-01.

Verticles and the event bus

Now that we have our skeleton, let’s see how to handle our alarms. If you remember from the functional requirements, we need to be able to define an alarm, possibly also be able to disable it and, otherwise, let it be triggered at the right moment. We will implement alarms as verticles, one of the core components of Vert.x.

Verticles

Verticles can be seen as kinds of lightweight processes that are isolated one of another. In that aspect they are different from Java threads which share parts of the memory. They are lightweight in memory and very cheap to create because they are managed directly by Vert.x itself. Verticles are deployed in a Vert.x application in order to handle some workload, either long lived tasks, like a user session or short lived, for some temporary processing. The life cycle of a verticle is very simple:

public class MyVerticle extends AbstractVerticle {

  // called when the verticle is deployed
  public void start() {
    // Initialise members
    // Subscribe handlers to receive messages from the event bus (see below)
  }

  // called when the verticle is undeployed
  // only necessary if some ressources need to be freed
  public void stop() {
    // Cleanup resources 
  }

}

Being independent one of each other, the only way for verticles to communicate with the outside world, other verticles or other parts of the system is to exchange messages on the event bus, another core Vert.x component.

Messages and the event bus

The event bus is called the nervous system of Vert.x in the documentation and can be seen as a communication hub that can spread across many Vert.x nodes and sometimes not even only Vert.x nodes as for example web clients. Messages are sent on an address; addresses can be any string. Messages are received by asynchronous handlers that have been registered on that specific address. Many handlers can be registered on one single address and one handler can be registered on many addresses thus allowing one-to-many and one-to-one communications. Messages can be either sent on the event bus with the send() method or published with publish(). send() sends a message to just one of the listening handlers. The one that will receive the message is selected by Vert.x. It is possible for the recipient to reply by sending back another message with reply() or the send back an error with fail(). publish() on the other hand sends the message all handlers, as a broadcast.

Messages can be of any primitive type and they can also be (java) Strings and buffers, a Vert.x generic data holder. Vert.x documentation however recommends to format messages as JSON objects. Vert.x provides for this purpose a very simple and complete API, the JsonObject class.

Verticle code pattern

To sum up what was said previously with some code, below is a typical structure of a verticle that waits for some action to be done. Notice that this is my own way of structuring the code and it is not any kind of official recommendation.

public class MyVerticle extends AbstractVerticle {        // (1) see below

  private String state = null;
  
  @Override
  public void start(Future<Void> sf) {                    // (2)
     state = ...;                                         // (3)
     vertx
       .eventBus()
       .consumer("my-address", this::onMessage);          // (4)
     sf.complete();
  }
  
  @Override
  public void stop(Future<Void> sf) {                     // (5)
    sf.complete();
  }
  
  private void onMessage(Message<JsonObject> message) {   // (6)

    String cmd = message.body().getString("cmd");         // (6.1)
    if (cmd) == null) {                                   
      log.error("onMessage:error:No 'cmd' in message");
      message.fail(-1, "No cmd");
      return;
    }
    
    switch (cmd) {                                        // (6.2)
      
    case "cmd_1":
      if(process(cmd) == true) {
        message.reply(new JsonObject()                    // (6.3)
                     .put("reply", 
                     "processed(cmd_1) -> 42") 
                     );
      }
      else {
        message.fail(-1, "Error processing 'cmd_1'");
      }
      break;
	  
    // case "some other command": ...
      
    default:
      log.error("onMessage() : unknown command '{}'", cmd);
      message.fail(-1, "unknown command");
    } // switch(cmd)
  }
}
  1. Any verticle must extend the AbstractVerticle class.
  2. Then we need to define the behaviour of our verticle upon start and stop by overriding the life cycle methods start() and stop(). Both have synchronous and asynchronous variants but unless what is done is straightforward, that is you are sure it will not take time, the asynchronous variants are preferred to make sure event loop will not get blocked.
  3. Initialize a hypothetical state member declared above.
  4. Here we subscribe to the messages that will be send on the address my-address. Upon reception of the message, the handler onMessage() will be called with the message as parameter (see point 6.)
  5. stop() is the counter part of start(). It gets called when the verticle is undeployed. It can be omitted if there is nothing to clean.
  6. The onMessage() method handles the messages the handler receives. The naming is mine and its design is a matter of personal taste. I prefer my messages to all have the same global structure: they are JSON objects with at least one mandatory field, cmd which represent the command to execute. There might be other fields that are the parameters to the command.
    1. The first thing is to make sure the command is not null. If the command is null we invoke message.fail() which sends a reply to the sender that is an error so that the sender can handle it.
    2. The last part is to switch over the content of cmd to find what to do.
    3. If every thing went fine in the processing of the command, the message is replied and the reply is also a JSON object. Otherwise, the message is replied with an error.

Sum up

To summarize, a verticle is an execution unit that is managed by the core of Vert.x. It can communicate with other components (and of course other verticles) by exchanging messages on an event bus. Exchanging messages insures isolation so that the developer is guaranteed that there is no memory shared between execution threads. This drastically reduces concurrency related bug threat as no synchronization, locks, semaphores nor whatever else mechanism is needed. Being lightweight, verticles can be created on demand for short lived workloads or for long lived tasks. As we’ll see later, the event bus can spread across several Vert.x nodes and this in turn helps to distribute the load of the application across multiple Vert.x nodes but also this reduces the impact of failure of one (or more) node.

That looks great so let’s see how all this fits together for our alarm application.

Implementing the application

Description

We will implement alarms as long lived verticles, instances of the class AlarmVerticle. For every /set request, setHandler() will deploy an instance of an AlarmVerticle. The information about when to trigger the alarm and the message to write will be passed to the verticle as parameters in the DeploymentOptions object. The start() method will get that parameters and it will setup a timer (yet another Vert.x facility) which will wake up at the right moment to trigger the alarm. The verticle needs to listen to just one message, the one that will be sent for the /del API. It will be listening on a unique address which for simplicity here is a UUID. This address will be the identifier returned by the /set API to the client so that the client can use it later when calling the /del API to delete the alarm.

Upon invocation of the /del API, delHandler() will use the identifier provided on the URL as an address and send to that address a harakiri message. Either the verticle exists and it will suicide, or nobody is listening on that address (because the alarm had already been triggered, for example) and we’ll get an error.

Notice that as UUIDs are easily predictable, in a real world application one might want to cryptographically safe random strings.

Simple, right? So let’s write some code.

Deploying a verticle

First we need to refactor the Main class so that we’ll be able to create instances of AlarmVerticle. There will be three steps for setHandler():

  1. Analyse the parameters and make sure they are of proper type, that is integers.
  2. Prepare the creation of the verticle.
  3. Create the verticle.

The first step is pretty straight forward and we change setHandler() as follows:

  private void setHandler(RoutingContext rc) {
    // extract the parameters from the URI and check for type
    int year, month, day, hour, minute = 0;
    try {
      year = Integer.parseInt(rc.request().getParam("y"));
      month = Integer.parseInt(rc.request().getParam("mo"));
      day = Integer.parseInt(rc.request().getParam("d"));
      hour = Integer.parseInt(rc.request().getParam("h"));
      minute = Integer.parseInt(rc.request().getParam("mn"));
    }
    catch(NumberFormatException e) {
      log.warn("One of the parameter to /set API is not a number: {}",
               e.getMessage());
      sendReply(rc, 200, "Err: " + e.getMessage());
      return;
    }
    String message = rc.request().getParam("m");
    String myaddr = UUID.randomUUID().toString();

    // step 2 ...
    // step 3 ...
  }

The second step is to prepare the deployment. Verticles are deployed with the method vertx.deployVerticle(<verticle's class>, depOpt, ar -> {<handler>}). The class will be "org.buguigny.vertx_alarm.AlarmVerticle". depOpt is an instance of DeploymentOptions which defines how the verticle should be deployed with what parameters. We’ll just tell Vert.x to deploy only one instance of this verticle with the parameters being the wake up date and time, the message and the verticle’s own address. The code is as follows:

  private void setHandler(RoutingContext rc) {
    // step 1 ...

    // Parameters to be provided to the newly deployed verticle
    JsonObject prm = new JsonObject()
      .put("year", year).put("month", month).put("day", day)
      .put("hour", hour).put("minute", minute)
      .put("message", message)
      .put("myaddr", myaddr)
      ;
    // log.info("Request /set with parameters {}", prm);
    
    DeploymentOptions depOpt = new DeploymentOptions()
      .setInstances(1)
      .setConfig(prm);

    // step 3 ...
  }

Now, everything is ready, we can deploy the verticle:

  private void setHandler(RoutingContext rc) {
    // step 1 ...
    // step 2 ...
	
    // deploy the verticle
    vertx
      .deployVerticle("org.buguigny.vertx_alarm.AlarmVerticle", depOpt, ar -> {
          if(ar.succeeded()) {
            log.info("Alarm created successfully");
            sendReply(rc, 200, "Ok. id: "+myaddr);
          }
          else {
            log.error("Failed to create the alarm {}", prm);
            sendReply(rc, 200, "Err");
          }
        });
  }

Deleting an alarm

Deleting an alarm simply means undeploying the corresponding verticle. This is simpler to do from within the verticle itself. So we will simply send a harakiri message to the verticle’s address and the verticle will do the rest. The code for the delHandler() is as follows:

  private void delHandler(RoutingContext rc) {
    String id = rc.request().getParam("id");
    vertx
      .eventBus()
      .send(id, new JsonObject().put("cmd", "harakiri"), ar -> {
          if(ar.succeeded()) {
            sendReply(rc, 200, "Ok: Alarm "+id+" deleted successfully");
          }
          else {
            log.warn("Could not delete alarm '{}'. Reason: {}", id, ar.cause().getMessage());
            sendReply(rc, 200, "Error: Alarm "+id+" could not be deleted");
          }
      });  // send(...)
  }

At that point, every thing is ready to deploy or undeploy verticles except that we don’t have the AlarmVerticle class yet. Let’s correct that.

Implementing the AlarmVerticle

As previously, we’ll proceed in several steps.

First, we need to handle time, which is always tricky. We’ll work with GMT time and we’ll suppose the client is in the same time zone as the server. We’ll use the Calendar class and calculate the number of milliseconds from now until when the alarm is supposed to be triggered.

Vert.x proposes a very handy way to execute some code at a given moment in the form of timers. Basically, a timer is defined with vertx.setTimer(<duration>, <handler>). The duration is equal to when-now and the handler is a method that will get called when the timer will expire; it has the following signature handle(long).

The last thing we need to do in start() is to tell Vert.x how to handle messages (address and handler).

The start() method looks like this:

  @Override
  public void start(Future<Void> sf) {

    // Step 1 ...
    // myaddr, now, when and message are instance members
    myaddr = config().getString("myaddr");
    message = config().getString("message");

    Calendar utc_cal = Calendar.getInstance(
      new SimpleTimeZone(0, "GMT"),
      new Locale("US")
    );
    now = utc_cal.getTimeInMillis();
    utc_cal.set(config().getInteger("year"),
      config().getInteger("month") - 1,  // Months are 0-indexed
      config().getInteger("day"),
      config().getInteger("hour"),
      config().getInteger("minute"),
      0); // don't care about seconds: one minute resolution
    when = utc_cal.getTimeInMillis();

    // Step 2 ...
    timerID = vertx.setTimer(when-now, new Handler<Long>() {
      @Override
      public void handle(Long lg) {
        log.info("ALAAARM: {}", message);
        vertx.undeploy(deploymentID());
      }
    });

    // Step 3 ...
    vertx.eventBus().consumer(myaddr, this::onMessage);

    log.info("Alarm '{}' to be triggered in {}s ", message, (when-now)/1000);
    sf.complete();
  }

We need to keep the identifier of the timer, timerID, in verticle’s state so that we will be able to cancel it when processing /del API.

The last thing to implement is the onMessage() handler to handle harakiri messages. Based on what we have discussed before, it’s quite straightforward. First we check if the command in the message is not null and if is the case and if it equals to harakiri, we cancel the timer and we undeploy the verticle.

  private void onMessage(Message<JsonObject> message) {
    
    String cmd = message.body().getString("cmd");
    // Check if non null. Omitted

    switch (cmd) {
    case "harakiri":
      log.info("Canceling alarm '{}' id='{}'", message, cmd);
      vertx.cancelTimer(timerID);
      vertx.undeploy(deploymentID());
      message.reply("Ok");
      break;

    default:
      log.error("Error: unknown command '{}'", cmd);
      message.fail(-1, "Unknown command'"+cmd+"'");
    }
  }

The code for this part sits on the branch master-02

Testing

Testing the application with a normal session

We start a node in a terminal and follow up the logs while we will issue curl commands in the client terminal:

Server terminal - The startup of the server

$ java -jar target/vx-alarm-0.1-fat.jar 8080
18:17:37.974 INFO  org.buguigny.vertx_alarm.Main - Starting HTTP server on port 8080.
18:17:38.071 INFO  org.buguigny.vertx_alarm.Main - Server running on port 8080

Client terminal - Creating a new alarm named WakeupWakeupWakeup

$ curl http://localhost:8080/set/2020/04/17/15/19/WakeupWakeupWakeup
Ok. id: 1489c572-6c4b-4841-a7ae-380e47ba1add

Server terminal - Processing of the /set API

18:18:49.987 INFO  ...AlarmVerticle - Alarm 'WakeupWakeupWakeup' to be triggered in 11s 
18:18:49.988 INFO  ...Main - Alarm created successfully
18:19:00.994 INFO  ...AlarmVerticle - ALAAARM: WakeupWakeupWakeup

The two first lines illustrate the creation of the alarm. The last line appears a bit later when the alarm is triggered.

Now, after that the alarm had been triggered, let’s try to delete it:

Client terminal - deleting the alarm

$  curl http://localhost:8080/del/1489c572-6c4b-4841-a7ae-380e47ba1add
Error: Alarm 1489c572-6c4b-4841-a7ae-380e47ba1add could not be deleted

Server terminal - Processing of the /del API

18:23:50.742 WARN  ...Main - Could not delete alarm '1489c572-6c4b-4841-a7ae-380e47ba1add'.
  Reason: No handlers for address 1489c572-6c4b-4841-a7ae-380e47ba1add

The reason of the error is that the identifier of the alarm we have provided doesn’t have any associated handler which is normal because the verticle had been undeployed after the alarm had been triggered. In other words, everything is normal.

Testing the limits

If we were to build a commercial application, how many alarms would we be able to handle on a single node? 10 thousands? 100 thousands? One million? To send that many HTTP requests we won’t use curl because the process might be slow; we will rather put in place a short Groovy script. Check out the branch master-03 and the script is in tests/test.groovy. Its parameters are the year, the month, the day, the hour, the minute when the alarm will be triggered and the number of alarms to be created. Thus the command

groovy tests/test.groovy 2020 4 17 18 36 10000

defines ten thousands alarms to be triggered on the 17th of April 2020 at 18 hours and 36 minutes.

Creation 10 thousands alarms takes about 4 seconds on my laptop (12 threads i7 CPU with 32GB RAM) and the load is barely noticeable in terms of CPU or RAM usage. Pushing to 1M alarms was done in a bit less than 60 seconds. While the alarms were created, CPU usage remained below 30% on average with some peaks at 60% on some cores and the RAM increased linearly, yet very reasonably. Creating 2M alarms took about 3 minutes; everything seemed to be OK but the CPU began to reach 90% on most cores. I pushed beyond but I stopped the process shortly after 3M verticles had been created, after about 5:30mn. Memory was not an issue, but the CPUs, one after another saturated at 100% and I got following warnings: Thread Thread[vert.x-eventloop-thread-8,5,main] has been blocked for 3645 ms, time limit is 2000 ms. This error means that something blocks the event (and work load distribution) loop. As our verticles are almost doing nothing, this means that the system itself blocks and, thus, becomes non-reactive.

Yet, about 2M naked verticles is not bad. We can imagine to run between 1M and 2M if they were bigger in terms of RAM and were doing some processing. And all this on a laptop… Think about a real server!

This test is of course not a benchmark but it provides some insights about the limits.

An interesting question is Would Java threads do better? You’ll find in the test directory the file tests/TestThread.java. This program simply takes a number from its command line and creates that many threads. Each thread goes for a nap during five seconds and exits. To compile the program type javac TestThread.java and to execute it with 10 threads type java -cp . TestThread 10. Now let’s try with ten thousands threads, just as we did for the verticles:

$ java -cp . TestThread 10000
thread no 2 is running...
thread no 0 is running...
...
thread no 9961 is running...
thread no 9962 is running...
[0,911s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) \
    for attributes: stacksize: 1024k, guardsize: 0k, detached.
Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: \
    possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:803)
	at TestThread.main(TestThread.java:18)

This test wasn’t really fair because we know that there is a limit put by the OS and not only by the JVM on number of native threads. Yet, if we were to implement something with a similar design, we wouldn’t be able to it directly with threads.

Conclusion

So far we have implemented a fully working alarm application. We are able to create an alarm and to delete previously created alarms. When time comes, alarms are triggered and write a log message. Alarms are implemented as long lived verticles which basically wait for two possible events: the time has come to trigger an alarm or reception of a self destruction message. We have also quickly shown that the system is able to manage hundreds of thousands of alarms on a single laptop which tends to demonstrate that Vert.x uses system resources with great economy, thanks to its asynchronous nature.

So far, however, our application only runs on one node. There are two limitation to this:

  1. even if the number of verticles we had been able to run on one node was huge, it is still a limit.
  2. if that very node crashes, the whole system has a down time.

In the next part of this post, we will extend our application to be able to run it on several nodes which should solve those two issues.

The quest for immortality

There is obviously no reason why alarms waiting to be triggered should survive the crash of the Vert.x node they run on, right? We have simply not implemented any kind of mechanism for that. The solution is to start at least two nodes that will share the information on running verticles, but without any plumbing, there is no reason why either node would know about the existence of the other one. If you remember the previous post, we have added cluster capabilities to the registry application to make the information survive the death of one node. We’ll do the same here.

Adding a cluster manager

In Vert.x cluster documentation there are four cluster managers that Vert.x integrates with. We’ll use the default one, Hazelcast. All the code changes for this section are on branch master-04.

First, let’s add the Hazelcast dependency to the pom.xml file:

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-hazelcast</artifactId>
</dependency>

Then we rebuild the project so that this dependency is taken into account: mvn clean package. In the previous post we also have had a discussion about cluster configuration; please refer to it. In this post we’ll keep the default one. Finally, we need to change the way the instance of Vert.x, vertx, is created in our code. Changes are to be done in Main.java:

  private void startup(int port) {
    VertxOptions opts = new VertxOptions()
      .setHAEnabled(true)
      ;

    log.info("Starting a clustered Vert.x node");
    Vertx.clusteredVertx(opts, vx_ar -> {
        if (vx_ar.failed()) {
          log.error("Failed to start clustered node. Cause: {}",
		    vx_ar.cause().getMessage());
          System.exit(-1);
        }
        else {
	  vertx = vertx = vx_ar.result();
	  log.info("Starting HTTP server on port {}.", port);
	  HttpServer server = vertx.createHttpServer();	 
	  Router router = Router.router(vertx);          
	  router.get("/set/:y/:mo/:d/:h/:mn/:m").handler(this::setHandler); 
	  router.get("/del/:id").handler(this::delHandler);
	  router.route().handler(this::defHandler); // catch all route
	  
	  server
	    .requestHandler(router::accept)
	    .listen(port, ar -> {
		if(ar.succeeded()) {
		  log.info("Server running on port {}", port);
		}
		else {
		  log.error("Could not start server on port {}", port);
		  System.exit(-1);
		}
	      });
	}
      });
  }

There are two main changes. First, we specify that this Vert.x instance has to run in clustered mode with setHAEnabled(true). Second, the creation of the Vert.x instance becomes an asynchronous task with Vertx.clusteredVertx(opts, vx_ar -> {.. }. If the creation succeeds we include the previous code that defines the routes and creates the HTTP server, otherwise, we exit.

Testing the cluster

Now that we have a cluster, let’s make some tests. We’ll start two nodes listening on ports 8081 and 8082 in two terminals. We’ll not get into details here as it had been the topic of the previous post, but in each terminal you should see lots of Hazelcast logs and eventually the formation of the cluster:

Node 8081 :

Members {size:2, ver:2} [
	Member [10.147.18.222]:5701 - ff184488-1386-4da9-b202-2167c732d72c this
	Member [10.147.18.222]:5702 - 8f728f0c-2b28-4539-9f09-7b439be5dba3
]

and

Node 8082 :

Members {size:2, ver:2} [
	Member [10.147.18.222]:5701 - ff184488-1386-4da9-b202-2167c732d72c
	Member [10.147.18.222]:5702 - 8f728f0c-2b28-4539-9f09-7b439be5dba3 this
]

OK, let’s now test how our alarms are managed in the cluster.

Test 1 : inter-node communication

In this first test, we will create an alarm on node 8081 and send a deletion request to node 8082. We of course expect that the alarm will get cleared.

Client terminal

$ curl http://localhost:8081/set/2020/05/1/8/36/Wakeup_1
Ok. id: 985d56a7-da5f-4cc7-bd7d-e424ef80a295
$ curl http://localhost:8082/del/985d56a7-da5f-4cc7-bd7d-e424ef80a295
Ok: Alarm 985d56a7-da5f-4cc7-bd7d-e424ef80a295 deleted successfully

Bingo, this actually works! You can check logs on terminal for node 8081. As said at the begining, the event bus is a structure that spreads across all Vert.x nodes in the cluster. When node 8082 receives the /del request along with the alarm ID, it just sends it over on the event bus and it is the duty of the event bus to route this event to the right destination which happens to be a handler on node 8081, so Vert.x can execute that handler which cancels the alarm.

Test 2 : sudden node termination

Let us try now another scenario where we will start both nodes and setup an alarm on node 8081. Then we will simulate the crash of node 8081 by killing it with Ctrl-C and, as previously, we’ll try to delete the alarm by sending a /del request to node 8082. In the client terminal, the session is as follows:

Client terminal

$ curl http://localhost:8081/set/2020/05/1/8/47/Wakeup_1
Ok. id: 01561e5c-1444-4f99-98c4-fab8b1662f0a

# 2 : At that moment node 8081 is killed with Crtl-C

$ curl http://localhost:8082/del/01561e5c-1444-4f99-98c4-fab8b1662f0a
Error: Alarm 01561e5c-1444-4f99-98c4-fab8b1662f0a could not be deleted

This time it fails… Why? Simply because the verticle that represents our alarm did not survive the crash of the node it was living on. When node 8081 crashed, node 8082 sends as previously a harakiri message on the event bus. But as the handler that had subscribed to that address is dead, the delivery fails. This is what the log on node 8082 says:

14:46:24.472 WARN  ...Main - Could not delete alarm '01561e5c-1444-4f99-98c4-fab8b1662f0a'.
  Reason: No handlers for address 01561e5c-1444-4f99-98c4-fab8b1662f0a

We’ll correct that in the next section.

Making the verticles highly available

What exactly does it actually mean to have a verticle to be highly available? Basically, this means that it will continue its execution as if nothing had happen. Technically speaking this means that we are able to restart a new verticle and make it take exactly same state as the verticle that had crashed. By state we mean the set of all internal data the verticle needs to function properly. One can categorize three types of such data:

  • The initial state is the set of parameters that are provided for startup of the verticle, that is what is set with the call DeploymentOptions.setConfig(). In our case the date and time, etc.:

    JsonObject prm = new JsonObject()
      .put("year", year).put("month", month).put("day", day)
      .put("hour", hour).put("minute", minute)
      .put("message", message)
      .put("myaddr", myaddr)
      ;
    
  • The dynamic state are values that had been computed while the verticle is running and that need to survive the crash because they represent some kind of progress in the execution. In our code so far, there is no such data, but we will introduce one later for the sake of the example.

  • The volatile state are values that have been computed while the verticle was running but that need to be re-computed after restart as their old value has no meaning anymore. In our example, timerID fits this category: if the node on which the verticle is executed crashes, the timer that is supposed to fire the alarm will also disappear and there is no sense to keep its ID as a new one will be generated upon verticle startup.

Let’s first see how to get the verticles being restarted and we’ll come back to state considerations later.

Restarting the verticle automatically

As high availability is a general requirement when working with distributed applications, and Vert.x being our friend, it already implements all of the necessary mechanics to have our verticles become highly available. We have already started Vertx.x in clustered mode with setHAEnabled(true) which allows to share messages between nodes. The second thing to do is to flag the verticles we create to be managed in highly available manner. That way Vert.x will restart those verticles on another node of the cluster with the initial state.

To do so, we just need to add one parameter to the DeploymentOptions instance:

DeploymentOptions depOpt = new DeploymentOptions()
  .setInstances(1)
  .setConfig(prm)
  .setHa(true)     // Set that verticle to be HA
  ;

So, basically, we added one line and gained high availability on the verticles. Is it that simple ? Let’s try.

Testing high availability of verticles

Let’s redo the same test as before: start two nodes, N1 and N2, initiate some alarms on N1, then kill N1 and observe what is happening on N2. If both nodes had already been stated:

Client terminal - Creation of two alarms

$ curl http://localhost:8081/set/2020/05/4/12/30/TA
  Ok. id: f8383dda-2912-4d38-9eac-f2daee7a472e
$ curl http://localhost:8081/set/2020/05/4/12/30/TB
  Ok. id: ddab2a1a-a2ea-4264-beee-7f3543b252c3

Then we kill N1 and below is the log we observe in terminal for N2

Node 2 terminal - After N1 had been killed

...
Members {size:1, ver:3} [
	Member [192.168.8.161]:5702 - 2ee1e93f-c101-4563-9a40-f386d0b60c25 this <1>
]
...
HAManager INFOS: node2ee1e93f-c101-4563-9a40-f386d0b60c25 says:
    Node 39dd1308-c608-4c67-af77-69d6f8a8fbab has failed.
    This node will deploy 2 deploymentIDs from that node.
...
alarm.AlarmVerticle - Alarm 'TA' to be triggered in 62s
HAManager : Successfully redeployed verticle
  org.buguigny.alarm.AlarmVerticle after failover
alarm.AlarmVerticle - Alarm 'TB' to be triggered in 62s
HAManager : Successfully redeployed verticle
  org.buguigny.alarm.AlarmVerticle after failover

We first see that N2 has detected that N1 has failed (Node 39dd1308... has failed) and that it will have to handle two verticles from the dead node (This node will deploy 2 deploymentIDs from that node.). Then the two next massages show that our two verticles are actually redeployed as expected (HAManager : Successfully redeployed verticle org.buguigny.alarm.AlarmVerticle after failover). One can easily verify that if we let the migrated alarms on node 2 long enough they will be triggered on time.

To reply the question raised at the end of the previous section, Yes, it looks like enabling high availability for verticles it’s that simple as adding one line of code to the existing code base!. Is it really so? Our verticle has what we called an initial state and an volatile state but actually no dynamic state. We’ll add one and see what is happening.

Adding a dynamic state

Let’s imagine we want to change or update the message that is displayed when the alarm is triggered. We define a new API, say /upd/<alarm id>/<new message>. Upon startup, the member message in the AlarmVerticle class will be initialized by the value from the initial state. Upon call to the upd API, it will be updated with the new value. We’ll code that behavior in a new handler called updHandler() which will send an upd message to the right verticle. Based on the previous code, the implementation is straightforward. The source code is on the branch master-05.

Testing the new API

Just to be sure everything is all right, let’s try the API:

Client terminal

$ curl http://localhost:8081/set/2020/05/4/13/27/TA
Ok. id: abf01deb-316e-44e3-93ac-b613879b371b
$ curl http://localhost:8081/upd/abf01deb-316e-44e3-93ac-b613879b371b/new_message
Ok: massage updated: new_message

Server terminal

alarm.AlarmVerticle - Alarm 'TA' to be triggered in 29s 
alarm.Main - Alarm created successfully
alarm.AlarmVerticle - Updating message for alarm \
           abf01deb-316e-44e3-93ac-b613879b371b: TA->new_message
alarm.AlarmVerticle - ALAAARM: new_message

The basic test is OK: we are able to change the message of an alarm. Let’s see what happens when the node on which the verticle is running crashes.

Testing verticles restart

First, we create an alarm and update the message.

Client terminal

$ curl http://localhost:8081/set/2020/05/4/14/00/TA
Ok. id: 6a6791f0-e91a-436d-b121-322ff1102d19
$ curl http://localhost:8081/upd/6a6791f0-e91a-436d-b121-322ff1102d19/new_message
Ok: massage updated: new_message

Once the alarm is created on node 1 we kill that node.

Node 1 terminal

alarm.AlarmVerticle - Alarm 'TA' to be triggered in 110s 
alarm.Main - Alarm created successfully
alarm.AlarmVerticle - Updating message for alarm \
    6a6791f0-e91a-436d-b121-322ff1102d19: TA->new_message
^C

On node 2, we see the verticle being redeployed, but, unfortunately it is the old message, the one from the initial state that is migrated and printed when the alarm is triggered.

Node 2 terminal

INFOS: node174b0c66-9aec-41c5-8c1a-dbab49dff51d says:   \
  Node 703766da-fc4a-475a-a4ee-7aec25ea92ae has failed. \
  This node will deploy 1 deploymentIDs from that node.
INFO  alarm.AlarmVerticle - Alarm 'TA' to be triggered in 86s
...
INFO  alarm.AlarmVerticle - ALAAARM: TA

So, things are finally not that simple. The initial state is migrated because the data is stored in the instance of DeploymentOptions but the dynamic state is lost. Let’s how to solve that problem.

How to preserve the dynamic state ?

There are several ways to preserve the dynamic state.

The traditional one used since ever in web development, is to save the state in a database every time it changes and reload it during restart. This requires additional server resources for the database access and may possibly become a bottleneck that would slow down the whole system: in a real application with, say, one million verticles per node, this solution would require one million database accesses for each verticle to be restarted. To mitigate the disk access impact, one can imagine to replace the database by an in-memory process, for example with a dedicated in-memory cache, like Redis. This would however complexify the overall architecture with additional servers.

Again, Vert.x is our friend and provides all necessary tools to cope with this situation as we already saw in the last post: a distributed map. Keep in mind that this solution is not universal and based on your specific use case you may chose the database or in-memory external cache. The choice depends on the existing infrastructure, recovery speed requirement, experience of the operation team, economics, …

Preserving state in a distributed map

To keep the state of any verticle in memory across the Vert.x cluster we will use the AsyncMap class. To avoid hard coding the name of the map across all nodes, we will use the name of the verticles class. The state of a verticle will be stored as a JSON Object and the key used to retrieve its state will be its address. The state will be stored whenever it changes; it will be retrieved only upon restart of the verticle. We’ll translate that in code by refactoring the file AlarmVerticle.java. The implementation will not be as straightforward as adding one line of code. Here are the general steps. You’ll find the source code for this part on the branch master-06 which also is the last one, master-final.

  1. in class member section, we define the member stateMap that will reference the map
  2. in startup(): handle the case when the verticle is started and the case when the verticle is re-started:
    1. get a handle on the map
    2. get the state for this verticle from the map. The key is the address.
    3. if state is null, we are starting the verticle for the first time so we populate the key entry with the dynamic state, i.e. the message to be displayed
    4. if state is not null it’s a re-start so we update the message member with the content of the state from the map.
    5. now that the map and the state are in synch, continue as previously with the creation of the timer and onMessage handler registration on the address.
  3. when the verticle is undeployed, we need to delete the corresponding entry from the map otherwise it will get saturated after a while with deleted or expired alarms. For that purpose, we overload the stop() method and remove()the key from stateMap.
  4. finally, we need to modify how the upd message is handled in onMessage(). Not only we need to set the member message to the new value, but also, we need to update the state in the map.

Now that all the machinery is in place, let’s give it a try

Testing the state preservation

We’ll redo the same test we did before: create an alarm on node 1, update its message, kill that node and wait until the alarm gets triggered on node 2:

Client terminal

$ curl http://localhost:8081/set/2020/05/4/15/00/TA
Ok. id: 063f8e17-c267-4b76-802d-809885958477
$ curl http://localhost:8081/upd/063f8e17-c267-4b76-802d-809885958477/new_message
Ok: massage updated: new_message

The alarm is created, updated then the node is killed:

Node 1 terminal

alarm.AlarmVerticle - Alarm 'TA' to be triggered in 95s 
alarm.Main - Alarm created successfully
alarm.AlarmVerticle - \
    Updating message for alarm 063f8e17-c267-4b76-802d-809885958477: TA->new_message
^C

Node 2 terminal

INFOS: nodeeb3a3e2e-7c53-4482-aa72-cfad5d251a2f says: \
   Node 517b71cc-7aef-4d47-b2c0-61810ce64d5d has failed. \
   This node will deploy 1 deploymentIDs from that node.
...
INFO  org.buguigny.vertx_alarm.AlarmVerticle - Alarm 'new_message' to be triggered in 70s 
INFOS: Successfully redeployed verticle org.buguigny.vertx_alarm.AlarmVerticle after failover
...
INFO  org.buguigny.vertx_alarm.AlarmVerticle - ALAAARM: new_message
INFO  org.buguigny.vertx_alarm.AlarmVerticle - \
    Undeploying verticle '063f8e17-c267-4b76-802d-809885958477', and removing its state

Bingo! It looks like we have eventually reached our goal of verticle immortality and, thus, application resilience: we can deploy verticles that are restarted on another node in case of crash with their state being preserved.

Let’s finish this post with some considerations on scalability related topics and how Vert.x can manage them.

Testing Vert.x scalability capabilities

Load, scalability and resilience

Load is a measure on how much resources are used on a system. This definition is quite vague because it partly depends on the system’s architecture. Load basic measures are CPU, RAM, disk IO, network, … For APIs, the number of requests per second is very pertinent. For our alarm application, where verticles remain in memory and are just waiting for some event to happen, their number may also be a good measure.

Scalability is about handling variations of load on a system by adding (or removing) resources. Scalability can be vertical or horizontal.

  • Vertical scalability is achieved by adding more resources such as RAM, CPU or disks to physical servers. Vertical scalability however has physical limits which are the limits of the servers them selves: a server with, say two physical sockets can only host two physical CPUs, right ?
  • Horizontal scalability on the other hand is achieved by directly adding new servers to the platform and the application is supposed to spread across all servers. In this case the architecture of the application must be aware that such evolution may happen. A good example are farms of HTTP servers located behind an HTTP load-balancer: adding new servers will make the load-balancer (after some configuration that can be done without downtime) send requests also to these new servers and thus providing more capacity to the whole farm.

Basically, for an application running on several nodes, we would like it to have the following behavior:

  • horizontal scalability: if each node begins to reach physical limits (~ 90% of RAM or CPU), adding a new node should provide some more capacity and decrease the load on existing ones. In other words, some processing load should be migrated on the new node so that after a while the load on each node should decrease and be balanced.
  • resilience: if one node crashes, the load from that node should be equally distributed on all remaining ones so that no node is overloaded and the application still works.

If those requirements are not met, it’s clear that we would end up in a situation where some of the nodes would be idle while others would be still overloaded.

With the advent of virtualization, cloudification, containerization, etc., it is of course the horizontal scalability that is the target to reach for operations teams as it’s very easy to add new nodes to a platform. In this sense, Vert.x seems to be a good candidate to build such aware applications as it can spread across many servers. So let’s see how it behaves.

Testing horizontal scalability

For these tests, we’ll start with two Vert.x nodes, N1 and N2, each loaded with 10.000 alarms that will fire sometime in the future with the following command:

for i in `seq 1 10000`; do 
  echo $i # Just to know how far we have gone
  curl http://localhost:8082/set/2020/08/12/09/41/TA; 
done

Now, if we start a third node N3 and let it join the cluster, what happens? Too bad : nothing happens! The verticles defined on nodes N1 and N2 remain there and N3 remains “empty” and the load is not distributed on the new node. Of course N3 will be able to handle requests it would receive but if the first nodes are getting saturated and if new requests are still sent to them, the application may crash while N3 wouldn’t be loaded.

Testing resilience

With the previous procedure let’s load three nodes N1, N2 and N3 with 10.000 alarms. Now, let’s kill N1 and let’s see what happens. As we saw previously, verticles from N1 are migrated, but are migrated on only one node! This also is a situation which puts the application at risk.

Let’s redo a similar experiment with four nodes. First let’s load N1, N2 and N3 with 10.000 alarms as previously and lets start a fourth node, N4, that will remain empty. Now kill N1. Again, alarms are migrated on only one node but in addition there is no warranty that the target would N4, or in other words the less loaded node… Bad.

Conclusion

We quickly saw that while we were able quite easily to implement verticle restart after node crash, Vert.x doesn’t fairly distribute those verticles among remaining nodes. Moreover, horizontal scalability seem to be only partly managed: while we can easily add new nodes to a cluster, the load is not re-balanced on the new added (and thus not-loaded) nodes.

General conclusion

In this long post we have gone quite far with Vert.x. We have transformed a single node application into a fully distributed one that can spread across a whole cluster. We have also seen that in case of failure of one node in the cluster the application (actually Vert.x) is able to restart its components on other nodes. Yet, the restarted vertices are not equally balanced among remaining nodes that may lead to situations where some nodes are overloaded putting at risk of failure the whole application. Just as the dynamic state was not migrated automagically, we may need to implement some machinery to balance the restarted verticle among all nodes. This may be the topic of a further post dedicated to scalability. This point should however not put any shadow on the fantastic potentiality of Vert.x to help in creating fully distributed applications on the JVM.

 


Any Comment?

Please, feel free to leave comments, thoughts or corrections if you’ve found errors.