In a previous post we explored Vert.x clustering with a toy key/value registry: we set up a cluster piece by piece, configured it and experimented its behaviour upon node failures. Vert.x actually offers, ways more than just what we’ve discussed in that post. Vert.x proposes an actor-like development model which allows to get most out of servers resources while making the application distributed and resilient. In this post we’ll explore that aspects and as an example, we’ll implement an alarm application.
Introduction
Distributed systems
Distribution is often related to notions like concurrency, high availability or horizontal scalability. Basically, a system is distributed if its processes run concurrently, that is independently one of each other, with some rendez-vous to exchange data. Because they are independent, these processes can run sequentially, interleaved, or in parallel at exactly same time, and thus, they can run on the same core of one computer, on different cores of the same computer, or on different computers. This last use case is very interesting because if the system has the proper architecture, adding new computers into the system should, without too much pain, increase computing power and thus allowing horizontal scalability and protecting the system against failure of one (or some more) computer.
So how do we practically design such a system? There are several ways, but one answer is the actor model introduced by Carl Hewitt in early 70’s. We’ll not get in depth into the definition of actors here, but basically, an actor is an object (do not strictly think in terms of OOP here) able
- to send messages to other actors (and this is its only way to communicate)
- to execute some code independently of other actors, including creation of other actors, upon reception of certain type of message and
- to keep its internal state during its lifetime.
So, to answer the question we actually don’t want to implement all this machinery. What we want to do is to use a dedicated language or a set of libraries that provide all this infrastructure and that lets us concentrate on implementing the business logic.
One of the features of Vert.x is that it includes in its core the ability to define actor-like objects, the so called verticles. Verticles are scheduled by an event loop managed directly by Vert.x. As creating verticles is a cheap operation (compared to OS processes or Java threads) and as Vert.x itself is mainly asynchronous, this software infrastructure optimizes usage of all of hardware resources of modern multi-core servers. In addition, as verticles communicate one with each other across a bus which can spread across a cluster by exchanging messages, an application can sit on top of several Vert.x nodes that themselves can execute on several servers. This theoretically should insure high availability and horizontal scaling.
You’ll never forget to pay taxes anymore
It’s always easier to introduce new concepts with a close-to-real-life
application that we can experiment with. Let’s develop a system that
would allow to define alarms. Those alarms will be triggered at a
given date and time and will print a predefined message. To keep it
simple, the alarm application will have a bare HTTP API without any
kind of security nor user interface other than curl
.
We will first use it under normal conditions with a couple of Vert.x nodes. Then we’ll see what happens if the nodes start to fail. Eventually, we’ll overload the system and, again, see what happens and how we can cope with such situations.
Tooling and code iterations
As in the previous post, we’ll just use Maven and a text editor. The
skeleton of the code has been generated with
vep
. Each code iteration will be
siting on its own git branch, the skeleton being on
master-00
. The next ones will be master-01
, master-02
and so on
until master-final
.
So, let’s start with the empty project:
1git clone https://gitlab.com/mszmurlo/vx-alarm.git
2## or git clone git@gitlab.com:mszmurlo/vx-alarm.git
3cd vx-alarm
4git co master-00
To make sure we have everything ready before going any further, let’s
compile the skeleton with mvn package
. The executable is
target/vx-alarm-0.1-fat.jar
that we can execute with java -jar target/vx-alarm-0.1-fat.jar
and that can be queried from the command
line with curl http://localhost:8080/hello
. If everything goes as
expected, we should get the reply Sending back parameter 'hello'
.
The basic application
To begin with, we will first build a basic version of our system without any clustering or any fancy thing alike. We’ll go pretty fast on most aspects in this part as it is mostly the same as the registry application from the previous installment. We’ll spend some more time on verticles and the event bus as they are at the heart of our discussion.
Functional requirements
We’ll build an app that lets users set up alarms defined by a date, a
time and a message. When time comes, alarms will be triggered and the
message will be displayed in the logs. Alarms can be canceled and thus
they must have some unique identifier. So, basically, we’ll define the
set
HTTP API that will receive a year, a month, a day, an hour, a
minute and the message; it will “program” the alarm and will return a
unique identifier so that the alarm can be manipulated in the
future. We’ll also define the del
API that will take an identifier
as its parameter and will cancel the alarm with that identifier.
Here is the formal list of the API that we will implement along with their parameters:
/set/<year>/<month>/<day>/<hour>/<minute>/<msg>
: defines the alarm with the message<msg>
that will be scheduled for display on<year>/<month>/<day>/<hour>/<minute>
, unless that time is already in the past. It will return a unique identifier,id
./del/<id>
: cancels the alarm identified by the identifier<id>
, if it exists.
A typical session would be as follows:
1date
2samedi 18 avril 2020, 12:29:01 (UTC+0300)
3
4curl http://localhost:8081/set/2020/04/18/09/30/WakeupWakeupWakeup
5Ok. id: 41bdf04a-a5ad-4f08-837a-3bc1737f758c
6
7curl http://localhost:8081/del/41bdf04a-a5ad-4f08-837a-3bc1737f758c
8Ok: Alarm 41bdf04a-a5ad-4f08-837a-3bc1737f758c deleted successfully
Here we create an alarm to be fired at 09:30:00-GMT, that is
12:30:00-GMT+3 my local time. The systems returns Ok
to tell the
alarm had been created and an identifier in the form of an UUID. Then,
We delete that alarm by using that identifier.
Now, if we create an alarm with curl http://localhost:8081/set/2020/04/18/09/33/WakeupWakeupWakeup
and
wait until it is scheduled, well see the following logs in the server
console:
12:32:26.936 INFO ...AlarmVerticle - Alarm 'WakeupWakeupWakeup' to be triggered in 34s
12:32:26.936 INFO ...Main - Alarm created successfully
12:33:00.970 INFO ...AlarmVerticle - ALAAARM: WakeupWakeupWakeup
The logs reported above do not correspond exactly to the logs you’ll see on the screen: along this post I will strip logs down to make them easier to read.
Dummy skeleton
Let’s first prepare everything. We need a HTTP server with routes corresponding to the APIs. For the moment we’ll just implement dummy request handlers so that our system will be able to respond. We will also need later to start several Vert.x nodes on the same machine during the tests so each node needs to listen on its own port which will be provided as the command line parameter.
Below is shown the startup(...)
method from Main.java
that gets
called after command line parsing (please refer to the previous
post for more details):
1private void startup(int port) {
2 VertxOptions opts = new VertxOptions()
3 // add some options with setXXX() methods
4 ;
5 vertx = Vertx.vertx(opts);
6
7 log.info("Starting HTTP server on port {}.", port);
8
9 HttpServer server = vertx.createHttpServer(); // Create the server
10 Router router = Router.router(vertx); // Define the router
11 router.get("/set/:y/:mo/:d/:h/:mn/:m").handler(this::setHandler);
12 router.get("/del/:id").handler(this::delHandler);
13 // Any other method or URL will be handeled by 'defHandler' as an error
14 router.route().handler(this::defHandler);
15
16 server
17 .requestHandler(router::accept)
18 .listen(port, ar -> {
19 if(ar.succeeded()) {
20 log.info("Server running on port {}", port);
21 }
22 else {
23 log.error("Could not start server on port {}", port);
24 System.exit(-1);
25 }
26 });
27}
First, we start Vert.x. For the moment it’s a standalone instance but we’ll transform it into a clustered node later. Then, we define the routes that correspond to our APIs. And, eventually, we start the HTTP server.
For the moment the request handlers are just dummy methods that print
Ok
on the client side or Error
if the HTTP method is not a GET
or if the URL doesn’t match what we have specified in the router. All
will have the same basic structure. Below is the example of the
setHandler
that will handle all /set/...
requests:
1private void setHandler(RoutingContext rc) {
2 String year = rc.request().getParam("y");
3 String month = rc.request().getParam("mo");
4 String day = rc.request().getParam("d");
5 String hour = rc.request().getParam("h");
6 String minute = rc.request().getParam("mn");
7 String message = rc.request().getParam("m");
8 log.info("Request /set/{}/{}/{}/{}/{} -> {}", year, month, day, hour, minute, message);
9 sendReply(rc, 200, "Ok");
10}
First, we extract the parameters from the URI and then we do something with those parameters, here a simple log action. Eventually, we send back a response to the client.
Start the server in one terminal with java -jar target/vx-alarm-0.1-fat.jar 8080
and send some requests from another
one. The session should look like this:
Client terminal
curl http://localhost:8080/set/2020/05/1/16/00/hello
Ok
curl http://localhost:8080/set/2020/05/1/16/00
Err ## The message is missing
curl http://localhost:8080/del/123
Ok ## del with a dummy id '123'
curl -X POST http://localhost:8080/del/123
Err ## Only GET are accepted
curl -X GET http://localhost:8080/del/123
Ok
Server terminal
07:59:53.544 INFO ...Main - Request /set/2020/05/1/16/00 -> hello
07:59:58.338 WARN ...Main - Request not allowed: 'GET:http://localhost:8080/set/2020/05/1/16/00'
08:00:09.969 INFO ...Main - Request /del/123
08:00:17.976 WARN ...Main - Request not allowed: 'POST:http://localhost:8080/del/123'
08:00:31.593 INFO ...Main - Request /del/123
The application doesn’t do anything interesting so far but we have a
working skeleton. The source for this step is on branch master-01
.
Verticles and the event bus
Now that we have our skeleton, let’s see how to handle our alarms. If you remember from the functional requirements, we need to be able to define an alarm, possibly also be able to disable it and, otherwise, let it be triggered at the right moment. We will implement alarms as verticles, one of the core components of Vert.x.
Verticles
Verticles can be seen as kinds of lightweight processes that are isolated one of another. In that aspect they are different from Java threads which share parts of the memory. They are lightweight in memory and very cheap to create because they are managed directly by Vert.x itself. Verticles are deployed in a Vert.x application in order to handle some workload, either long lived tasks, like a user session or short lived, for some temporary processing. The life cycle of a verticle is very simple:
1public class MyVerticle extends AbstractVerticle {
2
3 // called when the verticle is deployed
4 public void start() {
5 // Initialise members
6 // Subscribe handlers to receive messages from the event bus (see below)
7 }
8
9 // called when the verticle is undeployed
10 // only necessary if some ressources need to be freed
11 public void stop() {
12 // Cleanup resources
13 }
14
15}
Being independent one of each other, the only way for verticles to communicate with the outside world, other verticles or other parts of the system is to exchange messages on the event bus, another core Vert.x component.
Messages and the event bus
The event bus is called the nervous system of Vert.x in the
documentation and can be seen as a communication hub that can spread
across many Vert.x nodes and sometimes not even only Vert.x nodes as
for example web clients. Messages are sent on an address; addresses can
be any string. Messages are received by asynchronous handlers that
have been registered on that specific address. Many handlers can be
registered on one single address and one handler can be registered on
many addresses thus allowing one-to-many and one-to-one
communications. Messages can be either sent on the event bus with
the send()
method or published with publish()
. send()
sends a
message to just one of the listening handlers. The one that will
receive the message is selected by Vert.x. It is possible for the
recipient to reply by sending back another message with reply()
or
the send back an error with fail()
. publish()
on the other hand
sends the message all handlers, as a broadcast.
Messages can be of any primitive type and they can also be (java)
String
s and buffer
s, a Vert.x generic data holder. Vert.x
documentation however recommends to format messages as JSON
objects. Vert.x provides for this purpose a very simple and complete
API, the JsonObject
class.
Verticle code pattern
To sum up what was said previously with some code, below is a typical structure of a verticle that waits for some action to be done. Notice that this is my own way of structuring the code and it is not any kind of official recommendation.
1public class MyVerticle extends AbstractVerticle { // (1) see below
2
3 private String state = null;
4
5 @Override
6 public void start(Future<Void> sf) { // (2)
7 state = ...; // (3)
8 vertx
9 .eventBus()
10 .consumer("my-address", this::onMessage); // (4)
11 sf.complete();
12 }
13
14 @Override
15 public void stop(Future<Void> sf) { // (5)
16 sf.complete();
17 }
18
19 private void onMessage(Message<JsonObject> message) { // (6)
20
21 String cmd = message.body().getString("cmd"); // (6.1)
22 if (cmd) == null) {
23 log.error("onMessage:error:No 'cmd' in message");
24 message.fail(-1, "No cmd");
25 return;
26 }
27
28 switch (cmd) { // (6.2)
29
30 case "cmd_1":
31 if(process(cmd) == true) {
32 message.reply(new JsonObject() // (6.3)
33 .put("reply",
34 "processed(cmd_1) -> 42")
35 );
36 }
37 else {
38 message.fail(-1, "Error processing 'cmd_1'");
39 }
40 break;
41
42 // case "some other command": ...
43
44 default:
45 log.error("onMessage() : unknown command '{}'", cmd);
46 message.fail(-1, "unknown command");
47 } // switch(cmd)
48 }
49}
- Any verticle must extend the
AbstractVerticle
class. - Then we need to define the behaviour of our verticle upon start and
stop by overriding the life cycle methods
start()
andstop()
. Both have synchronous and asynchronous variants but unless what is done is straightforward, that is you are sure it will not take time, the asynchronous variants are preferred to make sure event loop will not get blocked. - Initialize a hypothetical
state
member declared above. - Here we subscribe to the messages that will be send on the address
my-address
. Upon reception of the message, the handleronMessage()
will be called with the message as parameter (see point 6.) stop()
is the counter part ofstart()
. It gets called when the verticle is undeployed. It can be omitted if there is nothing to clean.- The
onMessage()
method handles the messages the handler receives. The naming is mine and its design is a matter of personal taste. I prefer my messages to all have the same global structure: they are JSON objects with at least one mandatory field,cmd
which represent the command to execute. There might be other fields that are the parameters to the command.- The first thing is to make sure the command is not
null
. If the command isnull
we invokemessage.fail()
which sends a reply to the sender that is an error so that the sender can handle it. - The last part is to switch over the content of
cmd
to find what to do. - If every thing went fine in the processing of the command, the message is replied and the reply is also a JSON object. Otherwise, the message is replied with an error.
- The first thing is to make sure the command is not
Sum up
To summarize, a verticle is an execution unit that is managed by the core of Vert.x. It can communicate with other components (and of course other verticles) by exchanging messages on an event bus. Exchanging messages insures isolation so that the developer is guaranteed that there is no memory shared between execution threads. This drastically reduces concurrency related bug threat as no synchronization, locks, semaphores nor whatever else mechanism is needed. Being lightweight, verticles can be created on demand for short lived workloads or for long lived tasks. As we’ll see later, the event bus can spread across several Vert.x nodes and this in turn helps to distribute the load of the application across multiple Vert.x nodes but also this reduces the impact of failure of one (or more) node.
That looks great so let’s see how all this fits together for our alarm application.
Implementing the application
Description
We will implement alarms as long lived verticles, instances of the
class AlarmVerticle
. For every /set
request, setHandler()
will
deploy an instance of an AlarmVerticle
. The information about when to
trigger the alarm and the message to write will be passed to the
verticle as parameters in the DeploymentOptions
object. The
start()
method will get that parameters and it will setup a timer
(yet another Vert.x facility) which will wake up at the right
moment to trigger the alarm. The verticle needs to listen to just one
message, the one that will be sent for the /del
API. It will be
listening on a unique address which for simplicity here is a
UUID. This address will be the identifier returned by the /set
API
to the client so that the client can use it later when calling the
/del
API to delete the alarm.
Upon invocation of the /del
API, delHandler()
will use the
identifier provided on the URL as an address and send to that address
a harakiri
message. Either the verticle exists and it will suicide,
or nobody is listening on that address (because the alarm had already
been triggered, for example) and we’ll get an error.
Notice that as UUIDs are easily predictable, in a real world application one might want to cryptographically safe random strings.
Simple, right? So let’s write some code.
Deploying a verticle
First we need to refactor the Main
class so that we’ll be able to
create instances of AlarmVerticle
. There will be three steps for
setHandler()
:
- Analyse the parameters and make sure they are of proper type, that is integers.
- Prepare the creation of the verticle.
- Create the verticle.
The first step is pretty straight forward and we change setHandler()
as follows:
1 private void setHandler(RoutingContext rc) {
2 // extract the parameters from the URI and check for type
3 int year, month, day, hour, minute = 0;
4 try {
5 year = Integer.parseInt(rc.request().getParam("y"));
6 month = Integer.parseInt(rc.request().getParam("mo"));
7 day = Integer.parseInt(rc.request().getParam("d"));
8 hour = Integer.parseInt(rc.request().getParam("h"));
9 minute = Integer.parseInt(rc.request().getParam("mn"));
10 }
11 catch(NumberFormatException e) {
12 log.warn("One of the parameter to /set API is not a number: {}",
13 e.getMessage());
14 sendReply(rc, 200, "Err: " + e.getMessage());
15 return;
16 }
17 String message = rc.request().getParam("m");
18 String myaddr = UUID.randomUUID().toString();
19
20 // step 2 ...
21 // step 3 ...
22 }
The second step is to prepare the deployment. Verticles are deployed
with the method vertx.deployVerticle(<verticle's class>, depOpt, ar -> {<handler>})
. The class will be
"org.buguigny.vertx_alarm.AlarmVerticle"
. depOpt
is an instance of
DeploymentOptions
which defines how the verticle should be deployed
with what parameters. We’ll just tell Vert.x to deploy only one
instance of this verticle with the parameters being the wake up date
and time, the message and the verticle’s own address. The code is as
follows:
1 private void setHandler(RoutingContext rc) {
2 // step 1 ...
3
4 // Parameters to be provided to the newly deployed verticle
5 JsonObject prm = new JsonObject()
6 .put("year", year).put("month", month).put("day", day)
7 .put("hour", hour).put("minute", minute)
8 .put("message", message)
9 .put("myaddr", myaddr)
10 ;
11 // log.info("Request /set with parameters {}", prm);
12
13 DeploymentOptions depOpt = new DeploymentOptions()
14 .setInstances(1)
15 .setConfig(prm);
16
17 // step 3 ...
18 }
Now, everything is ready, we can deploy the verticle:
1 private void setHandler(RoutingContext rc) {
2 // step 1 ...
3 // step 2 ...
4
5 // deploy the verticle
6 vertx
7 .deployVerticle("org.buguigny.vertx_alarm.AlarmVerticle", depOpt, ar -> {
8 if(ar.succeeded()) {
9 log.info("Alarm created successfully");
10 sendReply(rc, 200, "Ok. id: "+myaddr);
11 }
12 else {
13 log.error("Failed to create the alarm {}", prm);
14 sendReply(rc, 200, "Err");
15 }
16 });
17 }
Deleting an alarm
Deleting an alarm simply means undeploying the corresponding
verticle. This is simpler to do from within the verticle itself. So we
will simply send a harakiri
message to the verticle’s address and
the verticle will do the rest. The code for the delHandler()
is as
follows:
1 private void delHandler(RoutingContext rc) {
2 String id = rc.request().getParam("id");
3 vertx
4 .eventBus()
5 .send(id, new JsonObject().put("cmd", "harakiri"), ar -> {
6 if(ar.succeeded()) {
7 sendReply(rc, 200, "Ok: Alarm "+id+" deleted successfully");
8 }
9 else {
10 log.warn("Could not delete alarm '{}'. Reason: {}", id, ar.cause().getMessage());
11 sendReply(rc, 200, "Error: Alarm "+id+" could not be deleted");
12 }
13 }); // send(...)
14 }
At that point, every thing is ready to deploy or undeploy verticles
except that we don’t have the AlarmVerticle
class yet. Let’s correct
that.
Implementing the AlarmVerticle
As previously, we’ll proceed in several steps.
First, we need to handle time, which is always tricky. We’ll work with
GMT time and we’ll suppose the client is in the same time zone as the
server. We’ll use the Calendar
class and calculate the number of
milliseconds from now
until when
the alarm is supposed to be
triggered.
Vert.x proposes a very handy way to execute some code at a given
moment in the form of timers. Basically, a timer is defined with
vertx.setTimer(<duration>, <handler>)
. The duration is equal to
when-now
and the handler is a method that will get called when the
timer will expire; it has the following signature handle(long)
.
The last thing we need to do in start()
is to tell Vert.x how to
handle messages (address and handler).
The start()
method looks like this:
1 @Override
2 public void start(Future<Void> sf) {
3
4 // Step 1 ...
5 // myaddr, now, when and message are instance members
6 myaddr = config().getString("myaddr");
7 message = config().getString("message");
8
9 Calendar utc_cal = Calendar.getInstance(
10 new SimpleTimeZone(0, "GMT"),
11 new Locale("US")
12 );
13 now = utc_cal.getTimeInMillis();
14 utc_cal.set(config().getInteger("year"),
15 config().getInteger("month") - 1, // Months are 0-indexed
16 config().getInteger("day"),
17 config().getInteger("hour"),
18 config().getInteger("minute"),
19 0); // don't care about seconds: one minute resolution
20 when = utc_cal.getTimeInMillis();
21
22 // Step 2 ...
23 timerID = vertx.setTimer(when-now, new Handler<Long>() {
24 @Override
25 public void handle(Long lg) {
26 log.info("ALAAARM: {}", message);
27 vertx.undeploy(deploymentID());
28 }
29 });
30
31 // Step 3 ...
32 vertx.eventBus().consumer(myaddr, this::onMessage);
33
34 log.info("Alarm '{}' to be triggered in {}s ", message, (when-now)/1000);
35 sf.complete();
36 }
We need to keep the identifier of the timer, timerID
, in verticle’s
state so that we will be able to cancel it when processing /del
API.
The last thing to implement is the onMessage()
handler to handle
harakiri
messages. Based on what we have discussed before, it’s
quite straightforward. First we check if the command in the message is
not null
and if is the case and if it equals to harakiri
, we
cancel the timer and we undeploy the verticle.
1 private void onMessage(Message<JsonObject> message) {
2
3 String cmd = message.body().getString("cmd");
4 // Check if non null. Omitted
5
6 switch (cmd) {
7 case "harakiri":
8 log.info("Canceling alarm '{}' id='{}'", message, cmd);
9 vertx.cancelTimer(timerID);
10 vertx.undeploy(deploymentID());
11 message.reply("Ok");
12 break;
13
14 default:
15 log.error("Error: unknown command '{}'", cmd);
16 message.fail(-1, "Unknown command'"+cmd+"'");
17 }
18 }
The code for this part sits on the branch master-02
Testing
Testing the application with a normal session
We start a node in a terminal and follow up the logs while we will
issue curl
commands in the client terminal:
Server terminal - The startup of the server
1java -jar target/vx-alarm-0.1-fat.jar 8080
218:17:37.974 INFO org.buguigny.vertx_alarm.Main - Starting HTTP server on port 8080.
318:17:38.071 INFO org.buguigny.vertx_alarm.Main - Server running on port 8080
Client terminal - Creating a new alarm named WakeupWakeupWakeup
1curl http://localhost:8080/set/2020/04/17/15/19/WakeupWakeupWakeup
2Ok. id: 1489c572-6c4b-4841-a7ae-380e47ba1add
Server terminal - Processing of the /set
API
18:18:49.987 INFO ...AlarmVerticle - Alarm 'WakeupWakeupWakeup' to be triggered in 11s
18:18:49.988 INFO ...Main - Alarm created successfully
18:19:00.994 INFO ...AlarmVerticle - ALAAARM: WakeupWakeupWakeup
The two first lines illustrate the creation of the alarm. The last line appears a bit later when the alarm is triggered.
Now, after that the alarm had been triggered, let’s try to delete it:
Client terminal - deleting the alarm
1curl http://localhost:8080/del/1489c572-6c4b-4841-a7ae-380e47ba1add
2Error: Alarm 1489c572-6c4b-4841-a7ae-380e47ba1add could not be deleted
Server terminal - Processing of the /del
API
18:23:50.742 WARN ...Main - Could not delete alarm '1489c572-6c4b-4841-a7ae-380e47ba1add'.
Reason: No handlers for address 1489c572-6c4b-4841-a7ae-380e47ba1add
The reason of the error is that the identifier of the alarm we have provided doesn’t have any associated handler which is normal because the verticle had been undeployed after the alarm had been triggered. In other words, everything is normal.
Testing the limits
If we were to build a commercial application, how many alarms would we
be able to handle on a single node? 10 thousands? 100 thousands? One
million? To send that many HTTP requests we won’t use curl
because
the process might be slow; we will rather put in place a short Groovy
script. Check out the branch master-03
and the script is in
tests/test.groovy
. Its parameters are the year, the month, the day,
the hour, the minute when the alarm will be triggered and the number
of alarms to be created. Thus the command
1groovy tests/test.groovy 2020 4 17 18 36 10000
defines ten thousands alarms to be triggered on the 17th of April 2020 at 18 hours and 36 minutes.
Creation 10 thousands alarms takes about 4 seconds on my laptop (12
threads i7 CPU with 32GB RAM) and the load is barely noticeable in
terms of CPU or RAM usage. Pushing to 1M alarms was done in a bit less
than 60 seconds. While the alarms were created, CPU usage remained
below 30% on average with some peaks at 60% on some cores and the RAM
increased linearly, yet very reasonably. Creating 2M alarms took about
3 minutes; everything seemed to be OK but the CPU began to reach 90%
on most cores. I pushed beyond but I stopped the process shortly after
3M verticles had been created, after about 5:30mn. Memory was not an
issue, but the CPUs, one after another saturated at 100% and I got
following warnings: Thread Thread[vert.x-eventloop-thread-8,5,main] has been blocked for 3645 ms, time limit is 2000 ms
. This error means
that something blocks the event (and work load distribution) loop. As
our verticles are almost doing nothing, this means that the system
itself blocks and, thus, becomes non-reactive.
Yet, about 2M naked verticles is not bad. We can imagine to run between 1M and 2M if they were bigger in terms of RAM and were doing some processing. And all this on a laptop… Think about a real server!
This test is of course not a benchmark but it provides some insights about the limits.
An interesting question is Would Java threads do better? You’ll
find in the test directory the file tests/TestThread.java
. This
program simply takes a number from its command line and creates that
many threads. Each thread goes for a nap during five seconds and
exits. To compile the program type javac TestThread.java
and to
execute it with 10 threads type java -cp . TestThread 10
. Now let’s
try with ten thousands threads, just as we did for the verticles:
1java -cp . TestThread 10000
2thread no 2 is running...
3thread no 0 is running...
4...
5thread no 9961 is running...
6thread no 9962 is running...
7[0,911s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) \
8 for attributes: stacksize: 1024k, guardsize: 0k, detached.
9Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: \
10 possibly out of memory or process/resource limits reached
11 at java.base/java.lang.Thread.start0(Native Method)
12 at java.base/java.lang.Thread.start(Thread.java:803)
13 at TestThread.main(TestThread.java:18)
This test wasn’t really fair because we know that there is a limit put by the OS and not only by the JVM on number of native threads. Yet, if we were to implement something with a similar design, we wouldn’t be able to it directly with threads.
Conclusion
So far we have implemented a fully working alarm application. We are able to create an alarm and to delete previously created alarms. When time comes, alarms are triggered and write a log message. Alarms are implemented as long lived verticles which basically wait for two possible events: the time has come to trigger an alarm or reception of a self destruction message. We have also quickly shown that the system is able to manage hundreds of thousands of alarms on a single laptop which tends to demonstrate that Vert.x uses system resources with great economy, thanks to its asynchronous nature.
So far, however, our application only runs on one node. There are two limitation to this:
- even if the number of verticles we had been able to run on one node was huge, it is still a limit.
- if that very node crashes, the whole system has a down time.
In the next part of this post, we will extend our application to be able to run it on several nodes which should solve those two issues.
The quest for immortality
There is obviously no reason why alarms waiting to be triggered should survive the crash of the Vert.x node they run on, right? We have simply not implemented any kind of mechanism for that. The solution is to start at least two nodes that will share the information on running verticles, but without any plumbing, there is no reason why either node would know about the existence of the other one. If you remember the previous post, we have added cluster capabilities to the registry application to make the information survive the death of one node. We’ll do the same here.
Adding a cluster manager
In Vert.x cluster
documentation there are four
cluster managers that Vert.x integrates with. We’ll use
the default one, Hazelcast. All the code
changes for this section are on branch master-04
.
First, let’s add the Hazelcast dependency to the pom.xml
file:
1<dependency>
2 <groupId>io.vertx</groupId>
3 <artifactId>vertx-hazelcast</artifactId>
4</dependency>
Then we rebuild the project so that this dependency is taken into
account: mvn clean package
. In the previous post we also have had a
discussion about cluster configuration; please refer to it. In this
post we’ll keep the default one. Finally, we need to change the way
the instance of Vert.x, vertx
, is created in our code. Changes are
to be done in Main.java
:
1private void startup(int port) {
2 VertxOptions opts = new VertxOptions()
3 .setHAEnabled(true)
4 ;
5
6 log.info("Starting a clustered Vert.x node");
7 Vertx.clusteredVertx(opts, vx_ar -> {
8 if (vx_ar.failed()) {
9 log.error("Failed to start clustered node. Cause: {}",
10 vx_ar.cause().getMessage());
11 System.exit(-1);
12 }
13 else {
14 vertx = vertx = vx_ar.result();
15 log.info("Starting HTTP server on port {}.", port);
16 HttpServer server = vertx.createHttpServer();
17 Router router = Router.router(vertx);
18 router.get("/set/:y/:mo/:d/:h/:mn/:m").handler(this::setHandler);
19 router.get("/del/:id").handler(this::delHandler);
20 router.route().handler(this::defHandler); // catch all route
21
22 server
23 .requestHandler(router::accept)
24 .listen(port, ar -> {
25 if(ar.succeeded()) {
26 log.info("Server running on port {}", port);
27 }
28 else {
29 log.error("Could not start server on port {}", port);
30 System.exit(-1);
31 }
32 });
33 }
34 });
35}
There are two main changes. First, we specify that this Vert.x
instance has to run in clustered mode with
setHAEnabled(true)
. Second, the creation of the Vert.x instance
becomes an asynchronous task with Vertx.clusteredVertx(opts, vx_ar -> {.. }
. If the creation succeeds we include the previous code that
defines the routes and creates the HTTP server, otherwise, we exit.
Testing the cluster
Now that we have a cluster, let’s make some tests. We’ll start two
nodes listening on ports 8081
and 8082
in two terminals. We’ll not
get into details here as it had been the topic of the previous post,
but in each terminal you should see lots of Hazelcast logs and
eventually the formation of the cluster:
Node 8081
:
Members {size:2, ver:2} [
Member [10.147.18.222]:5701 - ff184488-1386-4da9-b202-2167c732d72c this
Member [10.147.18.222]:5702 - 8f728f0c-2b28-4539-9f09-7b439be5dba3
]
and
Node 8082
:
Members {size:2, ver:2} [
Member [10.147.18.222]:5701 - ff184488-1386-4da9-b202-2167c732d72c
Member [10.147.18.222]:5702 - 8f728f0c-2b28-4539-9f09-7b439be5dba3 this
]
OK, let’s now test how our alarms are managed in the cluster.
Test 1 : inter-node communication
In this first test, we will create an alarm on node 8081
and send a
deletion request to node 8082
. We of course expect that the alarm
will get cleared.
Client terminal
1curl http://localhost:8081/set/2020/05/1/8/36/Wakeup_1
2Ok. id: 985d56a7-da5f-4cc7-bd7d-e424ef80a295
3
4curl http://localhost:8082/del/985d56a7-da5f-4cc7-bd7d-e424ef80a295
5Ok: Alarm 985d56a7-da5f-4cc7-bd7d-e424ef80a295 deleted successfully
Bingo, this actually works! You can check logs on terminal for node
8081
. As said at the begining, the event bus is a structure that
spreads across all Vert.x nodes in the cluster. When node 8082
receives the /del
request along with the alarm ID, it just sends
it over on the event bus and it is the duty of the event bus to route
this event to the right destination which happens to be a handler on
node 8081
, so Vert.x can execute that handler which cancels the
alarm.
Test 2 : sudden node termination
Let us try now another scenario where we will start both nodes and
setup an alarm on node 8081
. Then we will simulate the crash of node
8081
by killing it with Ctrl-C
and, as previously, we’ll try to
delete the alarm by sending a /del
request to node 8082
. In the
client terminal, the session is as follows:
Client terminal
1curl http://localhost:8081/set/2020/05/1/8/47/Wakeup_1
2Ok. id: 01561e5c-1444-4f99-98c4-fab8b1662f0a
3
4# 2 : At that moment node 8081 is killed with Crtl-C
5
6curl http://localhost:8082/del/01561e5c-1444-4f99-98c4-fab8b1662f0a
7Error: Alarm 01561e5c-1444-4f99-98c4-fab8b1662f0a could not be deleted
This time it fails… Why? Simply because the verticle that
represents our alarm did not survive the crash of the node it was
living on. When node 8081
crashed, node 8082
sends as previously a
harakiri
message on the event bus. But as the handler that had
subscribed to that address is dead, the delivery fails. This is what
the log on node 8082
says:
14:46:24.472 WARN ...Main - Could not delete alarm '01561e5c-1444-4f99-98c4-fab8b1662f0a'.
Reason: No handlers for address 01561e5c-1444-4f99-98c4-fab8b1662f0a
We’ll correct that in the next section.
Making the verticles highly available
What exactly does it actually mean to have a verticle to be highly available? Basically, this means that it will continue its execution as if nothing had happen. Technically speaking this means that we are able to restart a new verticle and make it take exactly same state as the verticle that had crashed. By state we mean the set of all internal data the verticle needs to function properly. One can categorize three types of such data:
The initial state is the set of parameters that are provided for startup of the verticle, that is what is set with the call
DeploymentOptions.setConfig()
. In our case the date and time, etc.:1JsonObject prm = new JsonObject() 2 .put("year", year).put("month", month).put("day", day) 3 .put("hour", hour).put("minute", minute) 4 .put("message", message) 5 .put("myaddr", myaddr) 6 ;
The dynamic state are values that had been computed while the verticle is running and that need to survive the crash because they represent some kind of progress in the execution. In our code so far, there is no such data, but we will introduce one later for the sake of the example.
The volatile state are values that have been computed while the verticle was running but that need to be re-computed after restart as their old value has no meaning anymore. In our example,
timerID
fits this category: if the node on which the verticle is executed crashes, the timer that is supposed to fire the alarm will also disappear and there is no sense to keep its ID as a new one will be generated upon verticle startup.
Let’s first see how to get the verticles being restarted and we’ll come back to state considerations later.
Restarting the verticle automatically
As high availability is a general requirement when working with
distributed applications, and Vert.x being our friend, it already
implements all of the necessary mechanics to have our verticles become
highly available. We have already started Vertx.x in clustered mode
with setHAEnabled(true)
which allows to share messages between
nodes. The second thing to do is to flag the verticles we create to be
managed in highly available manner. That way Vert.x will restart those
verticles on another node of the cluster with the initial state.
To do so, we just need to add one parameter to the DeploymentOptions
instance:
1DeploymentOptions depOpt = new DeploymentOptions()
2 .setInstances(1)
3 .setConfig(prm)
4 .setHa(true) // Set that verticle to be HA
5 ;
So, basically, we added one line and gained high availability on the verticles. Is it that simple ? Let’s try.
Testing high availability of verticles
Let’s redo the same test as before: start two nodes, N1 and N2, initiate some alarms on N1, then kill N1 and observe what is happening on N2. If both nodes had already been stated:
Client terminal - Creation of two alarms
1curl http://localhost:8081/set/2020/05/4/12/30/TA
2 Ok. id: f8383dda-2912-4d38-9eac-f2daee7a472e
3
4curl http://localhost:8081/set/2020/05/4/12/30/TB
5 Ok. id: ddab2a1a-a2ea-4264-beee-7f3543b252c3
Then we kill N1 and below is the log we observe in terminal for N2
Node 2 terminal - After N1 had been killed
...
Members {size:1, ver:3} [
Member [192.168.8.161]:5702 - 2ee1e93f-c101-4563-9a40-f386d0b60c25 this <1>
]
...
HAManager INFOS: node2ee1e93f-c101-4563-9a40-f386d0b60c25 says:
Node 39dd1308-c608-4c67-af77-69d6f8a8fbab has failed.
This node will deploy 2 deploymentIDs from that node.
...
alarm.AlarmVerticle - Alarm 'TA' to be triggered in 62s
HAManager : Successfully redeployed verticle
org.buguigny.alarm.AlarmVerticle after failover
alarm.AlarmVerticle - Alarm 'TB' to be triggered in 62s
HAManager : Successfully redeployed verticle
org.buguigny.alarm.AlarmVerticle after failover
We first see that N2 has detected that N1 has failed (Node 39dd1308... has failed
) and that it will have to handle two verticles
from the dead node (This node will deploy 2 deploymentIDs from that node.
). Then the two next massages show that our two verticles are
actually redeployed as expected (HAManager : Successfully redeployed verticle org.buguigny.alarm.AlarmVerticle after failover
). One can
easily verify that if we let the migrated alarms on node 2 long enough
they will be triggered on time.
To reply the question raised at the end of the previous section, Yes, it looks like enabling high availability for verticles it’s that simple as adding one line of code to the existing code base!. Is it really so? Our verticle has what we called an initial state and an volatile state but actually no dynamic state. We’ll add one and see what is happening.
Adding a dynamic state
Let’s imagine we want to change or update the message that is
displayed when the alarm is triggered. We define a new API, say
/upd/<alarm id>/<new message>
. Upon startup, the member message
in
the AlarmVerticle
class will be initialized by the value from the
initial state. Upon call to the upd
API, it will be updated with the
new value. We’ll code that behavior in a new handler called
updHandler()
which will send an upd
message to the right
verticle. Based on the previous code, the implementation is
straightforward. The source code is on the branch master-05
.
Testing the new API
Just to be sure everything is all right, let’s try the API:
Client terminal
1curl http://localhost:8081/set/2020/05/4/13/27/TA
2Ok. id: abf01deb-316e-44e3-93ac-b613879b371b
3
4curl http://localhost:8081/upd/abf01deb-316e-44e3-93ac-b613879b371b/new_message
5Ok: massage updated: new_message
Server terminal
alarm.AlarmVerticle - Alarm 'TA' to be triggered in 29s
alarm.Main - Alarm created successfully
alarm.AlarmVerticle - Updating message for alarm \
abf01deb-316e-44e3-93ac-b613879b371b: TA->new_message
alarm.AlarmVerticle - ALAAARM: new_message
The basic test is OK: we are able to change the message of an alarm. Let’s see what happens when the node on which the verticle is running crashes.
Testing verticles restart
First, we create an alarm and update the message.
Client terminal
1curl http://localhost:8081/set/2020/05/4/14/00/TA
2Ok. id: 6a6791f0-e91a-436d-b121-322ff1102d19
3
4curl http://localhost:8081/upd/6a6791f0-e91a-436d-b121-322ff1102d19/new_message
5Ok: massage updated: new_message
Once the alarm is created on node 1 we kill that node.
Node 1 terminal
alarm.AlarmVerticle - Alarm 'TA' to be triggered in 110s
alarm.Main - Alarm created successfully
alarm.AlarmVerticle - Updating message for alarm \
6a6791f0-e91a-436d-b121-322ff1102d19: TA->new_message
^C
On node 2, we see the verticle being redeployed, but, unfortunately it is the old message, the one from the initial state that is migrated and printed when the alarm is triggered.
Node 2 terminal
INFOS: node174b0c66-9aec-41c5-8c1a-dbab49dff51d says: \
Node 703766da-fc4a-475a-a4ee-7aec25ea92ae has failed. \
This node will deploy 1 deploymentIDs from that node.
INFO alarm.AlarmVerticle - Alarm 'TA' to be triggered in 86s
...
INFO alarm.AlarmVerticle - ALAAARM: TA
So, things are finally not that simple. The initial state is migrated
because the data is stored in the instance of DeploymentOptions
but
the dynamic state is lost. Let’s how to solve that problem.
How to preserve the dynamic state ?
There are several ways to preserve the dynamic state.
The traditional one used since ever in web development, is to save the state in a database every time it changes and reload it during restart. This requires additional server resources for the database access and may possibly become a bottleneck that would slow down the whole system: in a real application with, say, one million verticles per node, this solution would require one million database accesses for each verticle to be restarted. To mitigate the disk access impact, one can imagine to replace the database by an in-memory process, for example with a dedicated in-memory cache, like Redis. This would however complexify the overall architecture with additional servers.
Again, Vert.x is our friend and provides all necessary tools to cope with this situation as we already saw in the last post: a distributed map. Keep in mind that this solution is not universal and based on your specific use case you may chose the database or in-memory external cache. The choice depends on the existing infrastructure, recovery speed requirement, experience of the operation team, economics, …
Preserving state in a distributed map
To keep the state of any verticle in memory across the Vert.x cluster
we will use the AsyncMap
class. To avoid hard coding the name of
the map across all nodes, we will use the name of the verticles
class. The state of a verticle will be stored as a JSON Object and the
key used to retrieve its state will be its address. The state will be
stored whenever it changes; it will be retrieved only upon restart of
the verticle. We’ll translate that in code by refactoring the file
AlarmVerticle.java
. The implementation will not be as
straightforward as adding one line of code. Here are the general
steps. You’ll find the source code for this part on the branch
master-06
which also is the last one, master-final
.
- in class member section, we define the member
stateMap
that will reference the map - in
startup()
: handle the case when the verticle is started and the case when the verticle is re-started:- get a handle on the map
- get the
state
for this verticle from the map. The key is the address. - if
state
isnull
, we are starting the verticle for the first time so we populate the key entry with the dynamic state, i.e. the message to be displayed - if
state
is notnull
it’s a re-start so we update themessage
member with the content of the state from the map. - now that the map and the state are in synch, continue as
previously with the creation of the timer and
onMessage
handler registration on the address.
- when the verticle is undeployed, we need to delete the
corresponding entry from the map otherwise it will get saturated
after a while with deleted or expired alarms. For that purpose, we
overload the
stop()
method andremove()
the key fromstateMap
. - finally, we need to modify how the
upd
message is handled inonMessage()
. Not only we need to set the membermessage
to the new value, but also, we need to update the state in the map.
Now that all the machinery is in place, let’s give it a try
Testing the state preservation
We’ll redo the same test we did before: create an alarm on node 1, update its message, kill that node and wait until the alarm gets triggered on node 2:
Client terminal
1curl http://localhost:8081/set/2020/05/4/15/00/TA
2Ok. id: 063f8e17-c267-4b76-802d-809885958477
3
4curl http://localhost:8081/upd/063f8e17-c267-4b76-802d-809885958477/new_message
5Ok: massage updated: new_message
The alarm is created, updated then the node is killed:
Node 1 terminal
alarm.AlarmVerticle - Alarm 'TA' to be triggered in 95s
alarm.Main - Alarm created successfully
alarm.AlarmVerticle - \
Updating message for alarm 063f8e17-c267-4b76-802d-809885958477: TA->new_message
^C
Node 2 terminal
INFOS: nodeeb3a3e2e-7c53-4482-aa72-cfad5d251a2f says: \
Node 517b71cc-7aef-4d47-b2c0-61810ce64d5d has failed. \
This node will deploy 1 deploymentIDs from that node.
...
INFO org.buguigny.vertx_alarm.AlarmVerticle - Alarm 'new_message' to be triggered in 70s
INFOS: Successfully redeployed verticle org.buguigny.vertx_alarm.AlarmVerticle after failover
...
INFO org.buguigny.vertx_alarm.AlarmVerticle - ALAAARM: new_message
INFO org.buguigny.vertx_alarm.AlarmVerticle - \
Undeploying verticle '063f8e17-c267-4b76-802d-809885958477', and removing its state
Bingo! It looks like we have eventually reached our goal of verticle immortality and, thus, application resilience: we can deploy verticles that are restarted on another node in case of crash with their state being preserved.
Let’s finish this post with some considerations on scalability related topics and how Vert.x can manage them.
Testing Vert.x scalability capabilities
Load, scalability and resilience
Load is a measure on how much resources are used on a system. This definition is quite vague because it partly depends on the system’s architecture. Load basic measures are CPU, RAM, disk IO, network, … For APIs, the number of requests per second is very pertinent. For our alarm application, where verticles remain in memory and are just waiting for some event to happen, their number may also be a good measure.
Scalability is about handling variations of load on a system by adding (or removing) resources. Scalability can be vertical or horizontal.
- Vertical scalability is achieved by adding more resources such as RAM, CPU or disks to physical servers. Vertical scalability however has physical limits which are the limits of the servers them selves: a server with, say two physical sockets can only host two physical CPUs, right ?
- Horizontal scalability on the other hand is achieved by directly adding new servers to the platform and the application is supposed to spread across all servers. In this case the architecture of the application must be aware that such evolution may happen. A good example are farms of HTTP servers located behind an HTTP load-balancer: adding new servers will make the load-balancer (after some configuration that can be done without downtime) send requests also to these new servers and thus providing more capacity to the whole farm.
Basically, for an application running on several nodes, we would like it to have the following behavior:
- horizontal scalability: if each node begins to reach physical limits (~ 90% of RAM or CPU), adding a new node should provide some more capacity and decrease the load on existing ones. In other words, some processing load should be migrated on the new node so that after a while the load on each node should decrease and be balanced.
- resilience: if one node crashes, the load from that node should be equally distributed on all remaining ones so that no node is overloaded and the application still works.
If those requirements are not met, it’s clear that we would end up in a situation where some of the nodes would be idle while others would be still overloaded.
With the advent of virtualization, cloudification, containerization, etc., it is of course the horizontal scalability that is the target to reach for operations teams as it’s very easy to add new nodes to a platform. In this sense, Vert.x seems to be a good candidate to build such aware applications as it can spread across many servers. So let’s see how it behaves.
Testing horizontal scalability
For these tests, we’ll start with two Vert.x nodes, N1 and N2, each loaded with 10.000 alarms that will fire sometime in the future with the following command:
1for i in `seq 1 10000`; do
2 echo $i # Just to know how far we have gone
3 curl http://localhost:8082/set/2020/08/12/09/41/TA;
4done
Now, if we start a third node N3 and let it join the cluster, what happens? Too bad : nothing happens! The verticles defined on nodes N1 and N2 remain there and N3 remains “empty” and the load is not distributed on the new node. Of course N3 will be able to handle requests it would receive but if the first nodes are getting saturated and if new requests are still sent to them, the application may crash while N3 wouldn’t be loaded.
Testing resilience
With the previous procedure let’s load three nodes N1, N2 and N3 with 10.000 alarms. Now, let’s kill N1 and let’s see what happens. As we saw previously, verticles from N1 are migrated, but are migrated on only one node! This also is a situation which puts the application at risk.
Let’s redo a similar experiment with four nodes. First let’s load N1, N2 and N3 with 10.000 alarms as previously and lets start a fourth node, N4, that will remain empty. Now kill N1. Again, alarms are migrated on only one node but in addition there is no warranty that the target would N4, or in other words the less loaded node… Bad.
Conclusion
We quickly saw that while we were able quite easily to implement verticle restart after node crash, Vert.x doesn’t fairly distribute those verticles among remaining nodes. Moreover, horizontal scalability seem to be only partly managed: while we can easily add new nodes to a cluster, the load is not re-balanced on the new added (and thus not-loaded) nodes.
General conclusion
In this long post we have gone quite far with Vert.x. We have transformed a single node application into a fully distributed one that can spread across a whole cluster. We have also seen that in case of failure of one node in the cluster the application (actually Vert.x) is able to restart its components on other nodes. Yet, the restarted vertices are not equally balanced among remaining nodes that may lead to situations where some nodes are overloaded putting at risk of failure the whole application. Just as the dynamic state was not migrated automagically, we may need to implement some machinery to balance the restarted verticle among all nodes. This may be the topic of a further post dedicated to scalability. This point should however not put any shadow on the fantastic potentiality of Vert.x to help in creating fully distributed applications on the JVM.