Hi there,
I attach the files with the proposal for the PersistentEphemeralNode evolved
from your original code (https://gist.github.com/4047093).
The proposal adds the possibility to replace the data in the Persistent
Ephemeral Node during its lifecycle.
I have also added a coupled of test cases.
Please provide me some feedback (both proposal and test cases), and I can
further develop the code.
Regards,
Evaristo
________________________________
De: Jordan Zimmerman <[email protected]>
Para: [email protected]; Evaristo José Camarero
<[email protected]>
Enviado: Sábado 4 de Mayo de 2013 18:32
Asunto: Re: PersistentEphemeralNode recipe
I'd really like to see this recipe available. So, please go ahead.
-JZ
On May 4, 2013, at 1:01 AM, Evaristo José Camarero <[email protected]>
wrote:
Hi there,
>
>
>Some months ago there was a suggestion to implement Curator recipe to maintain
>an epehemeral node after session / connection problems
>(https://github.com/Netflix/curator/pull/201)
>
>
>This is a common use case in our application and I could help to properly test
>the current implementation proposed by Jordan
>(https://gist.github.com/4047093), in order to add the recipe to the Apache
>Curator distribution
>
>
>Something I am missing in the proposed implementation is the ability to change
>the reported data in the ephemeral node during the ephemeral node life cycle.
>
>
>Let me know if you believe is an interesting idea to introduce this recipe in
>the Apache Curator distribution.
>
>
>Regards,
>
>
>Evaristo
package org.apache.curator.framework.recipes.nodes;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.BaseClassForTests;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.Timing;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestPersistentEphemeralNode extends BaseClassForTests {
@Test
public void testBasics() throws Exception
{
PersistentEphemeralNode pen = null;
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
client.create().forPath("/test");
pen = new PersistentEphemeralNode(client, Mode.EPHEMERAL, "/test/pen", "a".getBytes());
pen.start();
Assert.assertTrue(pen.waitForInitialCreate(5, TimeUnit.SECONDS));
Assert.assertEquals(pen.getActualPath(), "/test/pen");
Assert.assertEquals(client.getData().forPath("/test/pen"), "a".getBytes());
final Semaphore semaphore = new Semaphore(0);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent arg0) {
semaphore.release();
}
};
client.checkExists().usingWatcher(watcher).forPath("/test/pen");
pen.replaceAndSetData("b".getBytes());
Assert.assertTrue(timing.acquireSemaphore(semaphore));
Assert.assertEquals(pen.getActualPath(), "/test/pen");
Assert.assertEquals(client.getData().usingWatcher(watcher).forPath("/test/pen"), "b".getBytes());
pen.replaceAndSetData("c".getBytes());
Assert.assertTrue(timing.acquireSemaphore(semaphore));
Assert.assertEquals(pen.getActualPath(), "/test/pen");
Assert.assertEquals(client.getData().usingWatcher(watcher).forPath("/test/pen"), "c".getBytes());
pen.close();
Assert.assertTrue(timing.acquireSemaphore(semaphore));
Assert.assertTrue(client.checkExists().forPath("/test/pen") == null);
}
finally
{
pen.close();
client.close();
}
}
@Test
public void testKilledSession() throws Exception
{
PersistentEphemeralNode pen = null;
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
CuratorFramework client1 = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
try
{
client.start();
client1.start();
client.create().forPath("/test");
pen = new PersistentEphemeralNode(client, Mode.EPHEMERAL, "/test/pen", "a".getBytes());
pen.start();
Assert.assertTrue(pen.waitForInitialCreate(5, TimeUnit.SECONDS));
final Semaphore deletedSemaphore = new Semaphore(0);
final Semaphore createdSemaphore = new Semaphore(0);
final Semaphore updatedSemaphore = new Semaphore(0);
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if ( event.getType() == EventType.NodeDeleted )
deletedSemaphore.release();
else if ( event.getType() == EventType.NodeCreated )
createdSemaphore.release();
else if ( event.getType() == EventType.NodeDataChanged )
updatedSemaphore.release();
}
};
client1.checkExists().usingWatcher(watcher).forPath("/test/pen");
KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
Assert.assertTrue(timing.acquireSemaphore(deletedSemaphore));
client1.checkExists().usingWatcher(watcher).forPath("/test/pen");
Assert.assertTrue(timing.acquireSemaphore(createdSemaphore));
Assert.assertTrue(client1.checkExists().forPath("/test/pen") != null);
Assert.assertEquals(client1.getData().forPath("/test/pen"), "a".getBytes());
pen.replaceData("b".getBytes());
Assert.assertEquals(client1.getData().forPath("/test/pen"), "a".getBytes());
client1.checkExists().usingWatcher(watcher).forPath("/test/pen");
KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
Assert.assertTrue(timing.acquireSemaphore(deletedSemaphore));
client1.checkExists().usingWatcher(watcher).forPath("/test/pen");
Assert.assertTrue(timing.acquireSemaphore(createdSemaphore));
Assert.assertTrue(client1.checkExists().forPath("/test/pen") != null);
Assert.assertEquals(client1.getData().forPath("/test/pen"), "b".getBytes());
}
finally
{
pen.close();
client.close();
client1.close();
}
}
}
package org.apache.curator.framework.recipes.nodes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateModable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>
* A persistent ephemeral node is an ephemeral node that attempts to stay present in
* ZooKeeper, even through connection and session interruptions.
* </p>
*
* <p>
* Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
* </p>
*/
public class PersistentEphemeralNode implements Closeable
{
@VisibleForTesting
volatile CountDownLatch initialCreateLatch = new CountDownLatch(1);
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final EnsurePath ensurePath;
private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
private final String basePath;
private final Mode mode;
private final Object dataLock = new Object();
private volatile byte[] data;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean isSuspended = new AtomicBoolean(false);
private final BackgroundCallback updateNodeBackgroundCallback;
private final BackgroundCallback createNodeBackgroundCallback;
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( ( Objects.equal(nodePath.get(), event.getPath()) ) && (event.getType() == EventType.NodeDeleted) )
{
createNode();
}
}
};
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
isSuspended.set((newState != ConnectionState.RECONNECTED) && (newState != ConnectionState.CONNECTED));
if ( newState == ConnectionState.RECONNECTED )
{
createNode();
}
}
};
private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
createNode();
}
}
};
private enum State
{
LATENT,
STARTED,
CLOSED
}
/**
* The mode for node creation
*/
public enum Mode
{
/**
* Same as {@link CreateMode#EPHEMERAL}
*/
EPHEMERAL()
{
@Override
protected CreateMode getCreateMode(boolean pathIsSet)
{
return CreateMode.EPHEMERAL;
}
@Override
protected boolean isProtected()
{
return false;
}
},
/**
* Same as {@link CreateMode#EPHEMERAL} with protection
*/
PROTECTED_EPHEMERAL()
{
@Override
protected CreateMode getCreateMode(boolean pathIsSet)
{
return CreateMode.EPHEMERAL;
}
@Override
protected boolean isProtected()
{
return true;
}
}
;
protected abstract CreateMode getCreateMode(boolean pathIsSet);
protected abstract boolean isProtected();
}
/**
* @param client client instance
* @param mode creation/protection mode
* @param basePath the base path for the node
* @param data data for the node
*/
public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] data)
{
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
data = Preconditions.checkNotNull(data, "data cannot be null");
String parentDir = ZKPaths.getPathAndNode(basePath).getPath();
ensurePath = client.newNamespaceAwareEnsurePath(parentDir);
updateNodeBackgroundCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
String path = null;
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
path = event.getPath();
byte[] bytes = client.getData().forPath(path);
if ( backgroundUpdateIfDifferent(bytes, path) )
{
CountDownLatch localLatch = initialCreateLatch;
initialCreateLatch = null;
if ( localLatch != null )
{
localLatch.countDown();
}
}
}
}
};
createNodeBackgroundCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
String path = null;
boolean updateRequired = false;
if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
{
path = event.getPath();
byte[] bytes = client.getData().forPath(path);
updateRequired = !backgroundUpdateIfDifferent(bytes, path);
}
else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
path = event.getName();
}
/**
* If data is changed before the initial create completes, things are not ok until data is updated.
*/
if ( updateRequired )
{
nodePath.set(path);
watchNode();
}
else if ( path != null )
{
nodePath.set(path);
watchNode();
CountDownLatch localLatch = initialCreateLatch;
initialCreateLatch = null;
if ( localLatch != null )
{
localLatch.countDown();
}
}
else
{
createNode();
}
}
};
createMethod = mode.isProtected() ? client.create().withProtection() : client.create();
synchronized ( dataLock )
{
this.data = Arrays.copyOf(data, data.length);
}
}
private boolean backgroundUpdateIfDifferent(byte[] bytes, String path) throws Exception
{
synchronized ( dataLock )
{
if ( Arrays.equals(bytes, this.data) )
{
return true;
}
else
{
client.setData().inBackground(updateNodeBackgroundCallback).forPath(path, this.data);
return false;
}
}
}
/**
* You must call start() to initiate the persistent ephemeral node. An attempt to create the node
* in the background will be started
*/
public void start()
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
client.getConnectionStateListenable().addListener(listener);
createNode();
}
/**
* Block until either the initial node creation initiated by {@link #start()} succeeds or
* the timeout elapses.
*
* @param timeout the maximum time to wait
* @param unit time unit
* @return if the node was created before timeout
* @throws InterruptedException if the thread is interrupted
*/
public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
{
Preconditions.checkState(state.get() == State.STARTED, "Not started");
return initialCreateLatch.await(timeout, unit);
}
@Override
public void close()
{
if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
{
return;
}
client.getConnectionStateListenable().removeListener(listener);
deleteNode();
}
/**
* Returns the currently set path or null if the node does not exist
*
* @return node path or null
*/
public String getActualPath()
{
return nodePath.get();
}
/**
* Set data that ephemeral node should set in ZK
* It does not try to write data in ZK
*
* @param data
*/
public void replaceData (byte[] data) {
data = Preconditions.checkNotNull(data, "data cannot be null");
synchronized ( dataLock )
{
this.data = Arrays.copyOf(data, data.length);
}
}
/**
* Set data that ephemeral node should set in ZK
* Uses async API to write value in ZK if the connection is up
* and there isn't a request already ongoing to do the update
*
* @param data
* @throws Exception
*/
public void replaceAndSetData (byte[] data) throws Exception {
data = Preconditions.checkNotNull(data, "data cannot be null");
synchronized ( dataLock )
{
this.data = Arrays.copyOf(data, data.length);
}
resetData();
}
/**
* Uses async API to write value in ZK if the connection is up
* and there isn't a request already ongoing to do the update
*
* @throws Exception
*/
public void resetData() throws Exception {
if ( isActive() )
{
synchronized ( dataLock )
{
client.setData().inBackground(updateNodeBackgroundCallback).forPath(basePath, this.data);
}
}
}
private void deleteNode()
{
String localNodePath = nodePath.getAndSet(null);
if ( localNodePath != null )
{
try
{
client.delete().guaranteed().inBackground().forPath(localNodePath);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
catch ( Exception e )
{
log.error("Deleting node: " + localNodePath, e);
}
}
}
private void createNode()
{
if ( !isActive() )
{
return;
}
try
{
String existingPath = nodePath.get();
String createPath = (existingPath != null) ? existingPath : basePath;
byte[] newData;
synchronized (dataLock)
{
newData = Arrays.copyOf(this.data, data.length);
}
ensurePath.ensure(client.getZookeeperClient());
createMethod.withMode(mode.getCreateMode(existingPath != null)).inBackground(createNodeBackgroundCallback).forPath(createPath, newData);
}
catch ( Exception e )
{
log.error("Creating node. BasePath: " + basePath, e);
}
}
private void watchNode()
{
if ( !isActive() )
{
return;
}
String localNodePath = nodePath.get();
if ( localNodePath != null )
{
try
{
client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
}
catch ( Exception e )
{
log.error("Watching node: " + localNodePath, e);
}
}
}
private boolean isActive()
{
return (state.get() == State.STARTED) && !isSuspended.get();
}
}