Vert.x clustering: a distributed Key/Value registry
Vert.x is a very powerful set of libraries that help to write reliable concurrent applications in several languages, typically, but not limited to, clients and servers. Beside the documentation, the web site provides many short examples that help to get up and running. I have been recently trying myself at clustering but I couldn’t find a detailed end-to-end description of how to set up a clustered system. So I decided to implement a toy in-memory registry and explore Vert.x internals. This post describes this process.
Introduction
Reactive systems
A system is said to be reactive if it continues to reply while facing some kind of pressure or failure. Reactiveness has been made popular by the The reactive manifesto. Basically, such a system still replies after that some exception has happen, after a server or a database had crashed, or during high workloads. There is no magic in there: a system is reactive because its architecture had been designed to be reactive and because it runs on an infrastructure that allows such capabilities. Just to give a simple idea, consider a web service that runs on a single (physical or virtual) server. If that server crashes, obviously the web service will also die and will not respond anymore to incoming requests. Yet, if that service runs on a couple of servers behind a load balancer and if they can exchange some information, for example about the sessions, loosing a node should not be an issue: the load balancer will detect the failure and will not send any request on that server anymore and if the session had been properly shared with other servers, any server will be able to handle any request so the end users will not even notice the outage of one node.
Those architectures often associated with terms like high availability, horizontal scalability, or clustering. The concepts are not really new but now days there are more and more tools made available to ease the design, the development, the deployment and the operations of such systems. Solutions range from languages (Erlang, Elixir, Clojure, Pony, …), libraries (Akka or Vert.x for JVM-based languages), infrastructures (Kubernetes, OpenStack, …), etc.. Real life applications often are mixes of those.
What is Vert.x
If you’ve never heard about Vert.x, you should definitely have a first in depth look at its home page and its documentation.
Basically, Vert.x is a set of libraries that run on the JVM but that are not tied to only Java. One can actually develop Vert.x applications in Java, Kotlin, Javascript, Scala and few others. Those libraries cover a very large scope of functionalities with the main guideline being to expose those functionalities as non-blocking APIs. Examples of functionalities are:
- creation of software servers and clients for various protocols
- creation of web sites with an integrated web toolkit
- clients to various SQL and non-SQL databases
- clients to various messaging brokers
- authentication and authorisation
- clustering
- and there are more…
Vert.x achieves that target by providing an asynchronous, event driven, concurrent programming model based on very-light-threads (named in Vert.x verticles) that communicate by exchanging messages. The clustering libraries exposes an API that eases the distribution of an application across multiple Vert.x instances running on separates servers (later, we will speak about Vert.x nodes). All this with the goal to ease building of reactive applications on the JVM.
The objective of this post is to dig into the distribution of an
application across multiple Vert.x nodes and how such an application
is highly available. As an illustration, we will build a basic
distributed Key:Value (String:String
) registry and experiment how it
resists to failures.
The registry
Registries are basically key/value stores. In a clustered environment with more than one node, a registry helps to pass information across the different modules of the application: if a module running on a node stores some data under the key K, then another module running on another node can retrieve that data by requesting the data associated to K. Of course the registry has to be known by all nodes or to be embedded in them.
The registry we will build is a very simple in memory
String
/String
store which exposes few APIs through HTTP
GET
requests:
/put/key/value
: will store the valuevalue
under the keykey
. If a value for the key already exists, it will be replaced by the new value. This API returns200:Ok
which means that the HTTP return code is200
with the reply payload beingOk
./get/key
: will retrieve the value for the keykey
. The API replies200:<value>
if the key exists and200:Ko
otherwise.-
/list
: lists all the n mappingskey -> value
from the registry. It replies with:200:n <key_1> -> <val_1> ... <key_n> -> <val_n>
where
n
is the number of records. - Any other type of request will result in
404:Error
.
Tooling and coding iterations
If you are happy with the command line, the strict minimum is a build tool, a decent text editor and, of course, a JDK. Personally I use Maven and Emacs. If you prefer Graddle you’ll have to adapt the build file. If you dislike the command line and only use an IDEs, then you are on your own, but I believe you know what you are doing.
The best way to learn is to experiment by your self and writing the
code. Yet, for the impatient ones, all the code is organized into
branches, master-00
being the stating point, master-01
being the
evolution of the starting point, master-02
being the evolution
of master-01
and so on until the final version master-final
.
To get the source code, first clone the Gitlab
repository , then switch on
branch master-00
.
git clone git@gitlab.com:mszmurlo/vx-registry.git
cd vx-registry
git co master-00
Once done, you can build the first executable with mvn package
which
will produce (among others) the file
target/registry-0.1-fat.jar
. This very first version is a self
contained HTTP server which you can run with the command java -jar
target/registry-0.1-fat.jar
. You can query it from the command line
with curl
or wget
on the http://localhost:8080/hello
endpoint.
This application is just a skeleton that had been generated by vep
,
a quick and dirty Groovy script of mine quite handy to begin a new
application or to build one quickly just for testing. If you are
interested by vep
see this
repository for details. The
master-00
branch had been generated with the following command:
groovy vep.groovy -pn vx-registry -pkg org.buguigny.vertx_registry
-mfunc
.
A standalone registry
As the first step we will modify master-00
code to create a registry
that conforms to the API specified above. First let’s see what is our
starting point.
The starting point
The master-00
directory tree is as follows:
── pom.xml
├── README.md
├── src
│ ├── main
│ │ ├── java
│ │ │ └── org
│ │ │ └── buguigny
│ │ │ └── vertxcluster
│ │ │ └── registry
│ │ │ └── Main.java
│ │ └── resources
│ │ └── logback.xml
│ └── test
│ └── ...
└── target
├── ...
We see two main directories under src
. main
contains the source
code for the application while test
is for the unit
tests.
resources
will host all resources like configuration files as
logback.xml
which contains the configuration of the logger. We will
later put in resources
the cluster configuration file as well.
If we look now at the content Main.java
file, we see it has three
main methods:
startup(String[] args)
defines the router of the HTTP server along with the handlers to be called and eventually starts that server on port8080
. It is called bymain()
.rootHandler(RoutingContext rc)
handlesGET
requests of the form/<some_string>
,defHandler(RoutingContext rc)
handles all other requests as errors and returns404:Error\n
.
To implement our registry we need to define a data structure to hold the key / value pairs and implement some additional handlers that will manipulate that structure.
The registry data structure
To hold information in the form of key/value pairs we’ll need some
kind of map. As Vert.x is designed to implement concurrent
applications it also implements its own data structures in order to
avoid as much as possible race conditions. The package
io.vertx.core.shareddata
contains some classes (actually interfaces)
specifically designed for that usage. Among those, there are
AsyncMap
and LocalMap
which look promising. According to the
documentation,
a LocalMap
allows to share data between different parts of an
application running in the same Vert.x node while a AsyncMap
allows to share data across nodes. For the standalone registry, we
will be using the LocalMap
.
LocalMap
behaves the same way regular Java maps do (it actually
extends the java.util.Map
interface) and adds some Vert.x dedicated
methods (see the javadoc). The usage
pattern is as follows:
LocalMap<String, String> map = sharedData.getLocalMap("a_map");
map.put("the_key", "the value");
The string a_map
is just the identifier of the LocalMap
in the system.
Then, elsewhere in the code we can retrieve the value of the key
the_key
with:
LocalMap<String, String> my_map = sharedData.getLocalMap("a_map");
String val = map.get("the_key");
// val holds "the value"
Now that we know how to store our key/value pairs, let’s define the HTTP API that will manipulate that structure.
The HTTP server
We will modify the HTTP server to handle the routes for the registry API. Basically, each API will have its own route:
HttpServer server = vertx.createHttpServer();
router.get("/put/:k/:v").handler(this::putHandler);
router.get("/get/:k").handler(this::getHandler);
router.get("/list").handler(this::listHandler);
router.route().handler(this::defHandler);
Every route is associated with a handler that will handle that request. The last line defines a catch all handler that handles any request that doesn’t comply with any route.
The API handlers will mostly all look the same. As an example the
putHandler()
, that will be triggered by an URL of the form
/put/key/value
, looks like:
private void putHandler(RoutingContext rc) {
String key = rc.request().getParam("k");
String val = rc.request().getParam("v");
LocalMap<String, String> map = vertx.sharedData().getLocalMap("the_map");
log.info("put(k={}, v={})", key, val);
map.put(key, val);
send(rc, 200, "Ok");
}
The only parameter to the handler is an instance of the
RoutingContext
class. Such an object is created for every HTTP
request that is received by the HTTP server and holds the context of
this request like cookies, parameters, session, etc.
We first get the parameters from the URL (actually from the context),
the key key
and its value held in val
. Then, we get the shared
map. The first call to vertx.sharedData().getLocalMap("the_map")
creates an empty map and returns it while further calls return the
populated map. Later on we will define a member to avoid all those
calls. Eventually we execute the operation to be implemented by this
handler, here a put
and on the last line, we send a reply back to
the client.
As explained before, the full source code for this version of the
registry is on the branch master-01
. You’ll find a getHandler()
that retrieves values from the map, a listHandler()
that lists the
content of the map, a defHandler()
which replies with an error to
any request that doesn’t comply with our specification and the
send()
method to send back the reply to the client. The port number
to listen on is also retrieved from the command line.
An example of interactive session
Now that we have a registry server with HTTP APIs, let’s have a live
session. First, we start the server with the command java -jar
target/registry-0.1-fat.jar 8080
. Then, in another terminal, we
send some requests:
$ curl http://localhost:8080/list
Ok: #0
$ curl http://localhost:8080/put/a/1
Ok
$ curl http://localhost:8080/put/b/2
Ok
$ curl http://localhost:8080/list
Ok: #2
a -> 1
b -> 2
$ curl http://localhost:8080/get/b
2
$ curl http://localhost:8080/put/b/42
Ok
$ curl http://localhost:8080/list
Ok: #2
b -> 42
a -> 1
$
First we check that the registry is empty. Then we put the mappings
a:1
and b:2
and we list the content of the registry which is as
expected. Then we map 42
on b
which already has a value and we
verify it had been changed. Hopefully it is the case.
Not bad!
Not bad, but not sufficient!
If the node crashes for some reason, sending requests to it will
obviously fail. We would basically like to have another node that
would keep the same registry information as the node that had
crashed. In other words, on a clustered registry composed of two
nodes, listening on ports 8081
and 8082
, the following should
work:
$ curl http://localhost:8081/put/a/1
Ok
$ curl http://localhost:8082/get/a
1
$
We are sending the put
request to the node that listens on port
8081
. But we want to retrieve that value by sending a request to
second node listening on port 8082
. Let’s test that on our current
implementation. We start two nodes in two terminals, and from a third
terminal we fire the following requests:
$ curl http://localhost:8081/put/a/1
Ok
$ curl http://localhost:8082/get/a
Ko
$
The Ko
shows that the value for the key a
does not exist on node
8082
. Obviously there is no magic and our applications is not
clustered. We didn’t do much to help it either… So we’ll change that
in the next section!
Clustered registry
Changing the data structure
In the section about data structures we saw there were two candidates
to hold the registry structure, AsyncMap
and LocalMap
. we have
chosen the latter because it implemented shared data across one node
which was fine for a standalone registry. So lets try the AsyncMap
as according to the documentation it allows to share data across
several nodes.
There are important changes between the AsyncMap
and LocalMap
APIs. AsyncMap
is asynchronous and in Vert.x that means handling the
result of calls to methods in handlers.
To obtain an AsyncMap
we can’t simply write AsyncMap<String,
String> map = vertx.sharedData().getAsyncMap("the_map");
but
vertx.sharedData().<String, String>getAsyncMap("the_map", ar -> {
if(ar.succeeded()) {
// do some operation on the map (get, put, ...)
}
else {
// handle the failure to obtain the map
}
});
To avoid the hassle of having that block copied over and over again in
every map manipulation method, we’ll move it in the startup()
method and we’ll define map
as a member. startup()
will be thus
re-defined as follows:
// member definition
...
private AsyncMap<String, String> map = null;
...
private void startup(String[] args) {
// initialize the vertx object (no change)
// get the port for the HTTP Server (no change)
// initialize the HTTP server with the routes (no change)
vertx.sharedData().<String, String>getAsyncMap("the_map", ar_map -> {
if(ar_map.succeeded()) {
// get the map
map = (AsyncMap<String, String>)ar_map.result();
// start the HTTP server
server.requestHandler(router::accept).listen(port, ar_http -> {
if(ar_http.succeeded()) {
// ...
}
else {
log.error("Could not start the HTTP server on port '{}'", port);
System.exit(-1);
}
}); // server listen
}
else {
log.error("Could not obtain the AsyncMap 'the_map'");
System.exit(-1);
}
}); // obtaining the map
...
} // end of class
Another change is related to how map manipulation methods are called: because the calls are asynchronous, we also need here to use handlers that will get called when the result of the method will be available. The global pattern is:
map.<method>(<parameters for that method>, ar -> {
if(ar.succeeded()) {
// Code to execute upon success
}
else {
// code to execute upon failure
}
});
For example, the putHandler()
with an AsyncMap
will be rewritten
as:
private void putHandler(RoutingContext rc) {
String key = rc.request().getParam("k");
String val = rc.request().getParam("v");
map.put(key, val, ar -> {
if(ar.succeeded()) {
log.info("put(k={}, v={}) -> Ok", key, val);
send(rc, 200, "Ok");
}
else {
log.error("put(k={}, v={}) -> Ko. Cause: '{}'", key, val, ar.cause().getMessage());
send(rc, 200, "Ko");
}
});
}
The source code for this stage is on branch master-02
.
As previously, let’s start two nodes and let’s query them:
$ curl localhost:8081/list
#0
$ curl localhost:8082/list
#0
$ curl localhost:8081/put/a/1
Ok
$ curl localhost:8082/list
#0
$
First we make sure both nodes have empty registries. Then we add the
a:1
mapping to the registry on node 8081
and we query the registry
on node 8082
which happens to be empty again! Obviously we are
missing something.
This something is the cluster manager.
Including a cluster manager
Vert.x provides integrations for four cluster managers, namely Hazelcast, Infinispan, Apache Ignite, Apache Zookeeper. They all share similar features exposed to Vert.x:
-
Discovery and group membership of Vert.x nodes in a cluster: basically starting a new node will be detected automatically and the new node will join the cluster and will further benefit from the features below. The same holds for a node leaving a cluster, either suddenly, if it had crashed, or smoothly upon shutdown.
-
Maintaining cluster wide topic subscriber lists: this is mainly to provide the ability to send (receive) messages to (or from) another node.
-
Distributed Map support: allows maps to span across the nodes in the cluster. This is the feature that interests us.
-
Distributed Locks: same for locks
-
Distributed Counters: same for counters
Each manager also provides its own specific set of features which should drive the choice. In our project, we will use Hazelcast as this is the default cluster manager in Vert.x and as it fits well our basic requirements.
To enable Hazelcast we need to do two things: first add the
vertx-hazelcast
dependency to the pom.xml
file so that it will get
downloaded and installed by Maven and second to instantiate vertx
to
be a clustered instance.
To add the dependency, add the following in the pom.xml
file:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-hazelcast</artifactId>
<!--
<version>3.8.5</version>
-->
</dependency>
then rebuild the whole project with mvn clean package
. Notice, that
Maven will get the appropriate version of the package itself.
The second point is to change how vertx
is instantiated so that it
will be a cluster-aware instance. As previously, this change implies
code reorganization in the startup()
method. First, we tell Vert.x
to run in clustered mode:
// ...
VertxOptions opts = new VertxOptions()
.setClustered(true)
;
Then, getting a Vert.x instance, vertx
, is now an asynchronous
operation, so the code has to be updated as follows:
...
// start vertx in cluster mode
Vertx.clusteredVertx(opts, ar -> {
if (ar.failed()) {
log.error("Failed to start Vertx cluster. Cause: {}", ar.cause().getMessage());
System.exit(-1);
}
else {
vertx = ar.result();
// chain here the creation of the AsyncMap
// and upon success the launch of the HTTP server
// as previously
}
});
The source code for this stage is on the branch master-03
.
Playing around with the cluster
First of all, when node 8081
is started, the output is much more
verbose: we see for example the startup of Hazelcast, the creation of
partitions, and so on. When the second node, 8082
, is started, we
also see that node 8081
emits some logs about new cluster
connection: Initialized new cluster connection
. Eventually, we see
that a cluster with both nodes had been formed:
Members {size:2, ver:2} [
Member [10.147.18.222]:5701 - cf6f92c7-9c52-4bd5-9e4b-ee27ecf188c3 this
Member [10.147.18.222]:5702 - 4745846e-bb74-42e9-9475-8d95fc0e2c54
]
cf6f92c7-9c52-4bd5-9e4b-ee27ecf188c3
and
4745846e-bb74-42e9-9475-8d95fc0e2c54
are node identifiers. In the
terminal window where we started node 8082
, we see the opposite:
Members {size:2, ver:2} [
Member [10.147.18.222]:5701 - cf6f92c7-9c52-4bd5-9e4b-ee27ecf188c3
Member [10.147.18.222]:5702 - 4745846e-bb74-42e9-9475-8d95fc0e2c54 this
]
That looks good!
Now, let’s redo the previous example: create a mapping on
node 8081
and query node 8082
for that key:
$ curl localhost:8081/put/c/3
Ok
$ curl localhost:8082/get/c
3
$
Great! It looks like the map is seen on both nodes.
Let’s do a last experiment : on node 8081
we create some mappings
(a:1
, b:2
, etc.). Then we’ll kill that very node and see what
happens on node 8082
.
$ curl localhost:8081/put/a/1
Ok
...
$ curl localhost:8081/put/d/4
Ok
## Here we kill node 8081 with Ctrl-C
$ curl localhost:8082/list
#4
a -> 1
b -> 2
c -> 3
d -> 4
$
Basically, an information written to the first node is available from the second node, even if the first node crashed. That looks pretty much what we were looking for, a distributed and fault tolerant registry.
Configuring the application
So, eventually, we have a distributed registry running on a cluster of
Vert.x nodes. The cluster configuration, that is Hazelcast
configuration, can be fine tuned by putting a configuration file in
src/main/resources/
. Let’s see some main configuration items.
Cluster configuration file
So far, the cluster manager we have been using had default
configuration which is taken from the file default-cluster.xml
that
is bundled inside the Vert.x Hazelcast library. This configuration can
be overridden if a file named cluster.xml
is provided on the class
path or at the root of the fat jar or if the configuration file name
is defined as a system property with
-Dvertx.hazelcast.config=./my-cluster-conf.xml
. System property has
precedence over class path which has precedence over the default
configuration file.
To get a correct cluster.xml
, just extract and copy the
default-cluster.xml
to src/main/resources/
:
$ unzip ~/.m2/repository/io/vertx/vertx-hazelcast/<version>/vertx-hazelcast-<version>.jar \
default-cluster.xml
$ mv default-cluster.xml src/main/resources/cluster.xml
Vert.x documentation for the configuration of the Hazelcast cluster manager can be found here.
The branch that corresponds to this stage is master-04
.
Joining the cluster
One very handy feature provided by Hazelcast is the ability to discover an already running cluster and automatically join. Basically, a newly started node has to:
- discover that there is a cluster around and join it
- or, if it’s “alone”, initiate a new, one-node, cluster.
Hazelcast proposes several discovery mechanisms two of them being,
multicast
and tcp-ip
; both are to be configured in the
<join>...</join>
section of cluster.xml
.
Discovering a cluster can be made the easy way with multicast
method. The newly started node will broadcast the fact that it exists,
the cluster will invite it to join. This is the default behaviour and
the configuration is as follows:
<network>
...
<multicast enabled="true">
<multicast-group>224.2.2.3</multicast-group>
<multicast-port>54327</multicast-port>
</multicast>
<tcp-ip enabled="false"></tcp-ip>
...
</network>
On the other hand, if multicast is not available (as this is the case
on some cloud infrastructures or on VPN defined virtual interfaces),
there is another method called tcp-ip
. To use it, we would need to
define a subset of servers known to belong to the cluster. This is
quite a strong assumption as all those known servers may be
down… The configuration would be as follows:
<network>
...
<multicast enabled="false"></multicast>
<tcp-ip enabled="true">
<hostname>machine1</hostname>
<interface>192.168.1.0-7</interface>
<interface>192.168.1.21</interface>
</tcp-ip>
...
</network>
Selecting the network interface
Servers in production environment typically have several interfaces
that have access to different LANs: one for the administration, one
for the backup, one for production, etc. Hazelcast allows to specify
on which network interface a specific node will communicate with the
cluster. This is configured in <interfaces>...</interfaces>
section:
<network>
...
<interfaces enabled="true">
<interface>192.168.1.*</interface>
<interface>192.168.2.4-18</interface>
<interface>192.168.3.3</interface>
</interfaces>
</network>
...
Wildcards are used to specify ranges. In the above declarations,
10.3.16.*
stands for addresses between 192.168.1.0
and
192.168.1.255
, while 192.168.2.4-18
stands for addresses between
192.168.2.4
and 192.168.2.18
, included.
Security
Hazelcast allows to use SSL or symetric encryption to secure socket level communications. However these feature require the enterprise edition.
Creating Separate Clusters
It is possible to create different clusters with the same nodes by
specifying different group names. Each single node is allowed to
belong to one single group and thus to one single cluster. Group
information is defined in the <group>...</group>
section:
<group>
<name>dev</name>
<!-- <password>dev-pass</password> -->
</group>
Notice that since Hazelcast 3.8.2 <password>
is ignored
and will be removed in some future release.
Homogeneous logs
By default Hazelcast uses JUL, the default Java logging framework. I
do prefer using slf4j
; a matter of personal taste… Hopefully
Hazelcast allows the selection of the logging framework with the
hazelcast.logging.type
property:
...
<properties>
...
<property name="hazelcast.logging.type">slf4j</property>
</properties>
...
Then logs level can be managed in the src/main/resources/logback.xml
file. For example, to make Hazelcast less noisy:
<logger name="com.hazelcast" level="warn"/>
Of course, the file src/main/resources/logback.xml
also defines
behaviour of the logs of all other components, Vert.x and our own
code.
The branch master-05
reflects the current state of the
application. This is also the final state and is on the branch
master-final
.
Conclusion on configuration
We have just scratched the topic of configuration as there are much more items available to fine tune. Full Hazelcast configuration documentation is avalable here.
Conclusion
In this post we have build a very simple distributed registry. It was a pretext to explore some aspects of Vert.x clustering: libraries, data structures, configuration, etc.. I believe this has given you the basic knowledge of the concepts and principles and that you will be now able to explore by yourself.