This post is about my experiment on storing Distributed Atomic Long(DAL) in the Zookeeper Nodes. Not that there are already enough blogs on this topic, The DAL comes out as out of the box recipe from the Curator Framework.
Here, we use Curator Framework to handle all zookeeper operations. Here's a simple springboot curator config. Kept is super simple, the list of zookeeper servers come from app.properties, with exponential retry policy. Once the client is created, we verify the connection by checking for "/" path.
@Configuration
public class CuratorConfig {
@Value("${zookeeper.connection}")
private String zookeeperServers;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Bean
@Qualifier("curatorConfig")
CuratorFramework getCurratorFramework() {
final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
builder.connectString(zookeeperServers);
builder.retryPolicy(new ExponentialBackoffRetry(10, 100, 10000));
builder.sessionTimeoutMs(30000);
CuratorFramework client = builder.build();
client.start();
logger.info("Waiting for curator to create a connection");
try {
client.blockUntilConnected();
client.checkExists().forPath("/");
} catch (Exception ie) {
logger.error("Couldn't test zookeeper connection", ie);
}
logger.info("Zookeeper client for curator created !");
return client;
}
}
We can now write a util that recursively creates ZK nodes. Also, it has methods to create a new DAL. Given a path say "path/to/zookeeper/node", I'm splitting them into individual node path and create a node a Persistent Node for it. I was getting a "" node after split, that's why I'm including a condition to track the "" node. The zookeeperConnection (CuratorFramework) is a springboot bean described in the code above. I'm creating a Persistent Node, you can create Ephemeral Storage as well.
public class ZookeeperUtil {
public static void recursivelyCreateZKNodes(CuratorFramework zookeeperConnection, String path) throws InterruptedException, Exception {
String prefix = "";
String[] nodes = path.split("/");
for (String node : nodes) {
if (node.equals("")) {
continue;
}
prefix += "/" + node;
if (zookeeperConnection.checkExists().forPath(prefix) != null) {
continue;
}
PersistentNode pnds = new PersistentNode(zookeeperConnection, CreateMode.PERSISTENT, false, prefix, "".getBytes());
pnds.start();
pnds.waitForInitialCreate(3, TimeUnit.SECONDS);
}
}
public static Boolean deleteZKNode(CuratorFramework zookeeperConnection, String path) throws Exception {
if (zookeeperConnection.checkExists().forPath(path) == null) {
return Boolean.FALSE;
}
zookeeperConnection.delete().forPath(path);
return Boolean.TRUE;
}
public static DistributedAtomicLong createNewDAL(CuratorFramework zookeeperConnection, String path) throws Exception {
DistributedAtomicLong al = new DistributedAtomicLong(zookeeperConnection,
path, new BoundedExponentialBackoffRetry(30000, 60000, 100));
al.trySet(0L);
return al;
}
}
Once this is done,
1. recursively create ZK Node, use
ZookeeperUtil.recursivelyCreateZKNodes(zookeeperConnection, requiredZookeeperNodePath);
2. Create a new DAL
ZookeeperUtil.createNewDAL(zookeeperConnection, createdZookeeperNodePath);
Store the DAL in a bean, and we should be good call increment on it or trySet. Additionally you can put the increment and trySet in the Zookeeper Utils as well. We should be good to Rock and Roll!
Here, we use Curator Framework to handle all zookeeper operations. Here's a simple springboot curator config. Kept is super simple, the list of zookeeper servers come from app.properties, with exponential retry policy. Once the client is created, we verify the connection by checking for "/" path.
@Configuration
public class CuratorConfig {
@Value("${zookeeper.connection}")
private String zookeeperServers;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Bean
@Qualifier("curatorConfig")
CuratorFramework getCurratorFramework() {
final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
builder.connectString(zookeeperServers);
builder.retryPolicy(new ExponentialBackoffRetry(10, 100, 10000));
builder.sessionTimeoutMs(30000);
CuratorFramework client = builder.build();
client.start();
logger.info("Waiting for curator to create a connection");
try {
client.blockUntilConnected();
client.checkExists().forPath("/");
} catch (Exception ie) {
logger.error("Couldn't test zookeeper connection", ie);
}
logger.info("Zookeeper client for curator created !");
return client;
}
}
We can now write a util that recursively creates ZK nodes. Also, it has methods to create a new DAL. Given a path say "path/to/zookeeper/node", I'm splitting them into individual node path and create a node a Persistent Node for it. I was getting a "" node after split, that's why I'm including a condition to track the "" node. The zookeeperConnection (CuratorFramework) is a springboot bean described in the code above. I'm creating a Persistent Node, you can create Ephemeral Storage as well.
public class ZookeeperUtil {
public static void recursivelyCreateZKNodes(CuratorFramework zookeeperConnection, String path) throws InterruptedException, Exception {
String prefix = "";
String[] nodes = path.split("/");
for (String node : nodes) {
if (node.equals("")) {
continue;
}
prefix += "/" + node;
if (zookeeperConnection.checkExists().forPath(prefix) != null) {
continue;
}
PersistentNode pnds = new PersistentNode(zookeeperConnection, CreateMode.PERSISTENT, false, prefix, "".getBytes());
pnds.start();
pnds.waitForInitialCreate(3, TimeUnit.SECONDS);
}
}
public static Boolean deleteZKNode(CuratorFramework zookeeperConnection, String path) throws Exception {
if (zookeeperConnection.checkExists().forPath(path) == null) {
return Boolean.FALSE;
}
zookeeperConnection.delete().forPath(path);
return Boolean.TRUE;
}
public static DistributedAtomicLong createNewDAL(CuratorFramework zookeeperConnection, String path) throws Exception {
DistributedAtomicLong al = new DistributedAtomicLong(zookeeperConnection,
path, new BoundedExponentialBackoffRetry(30000, 60000, 100));
al.trySet(0L);
return al;
}
}
Once this is done,
1. recursively create ZK Node, use
ZookeeperUtil.recursivelyCreateZKNodes(zookeeperConnection, requiredZookeeperNodePath);
2. Create a new DAL
ZookeeperUtil.createNewDAL(zookeeperConnection, createdZookeeperNodePath);
Store the DAL in a bean, and we should be good call increment on it or trySet. Additionally you can put the increment and trySet in the Zookeeper Utils as well. We should be good to Rock and Roll!