Hi there,

I attach the files with the proposal for the PersistentEphemeralNode evolved 
from your original code  (https://gist.github.com/4047093).


The proposal adds the possibility to replace the data in the Persistent 
Ephemeral Node during its lifecycle.

I have also added a coupled of test cases.

Please provide me some feedback (both proposal and test cases), and I can 
further develop the code.

Regards,

Evaristo


________________________________
 De: Jordan Zimmerman <[email protected]>
Para: [email protected]; Evaristo José Camarero 
<[email protected]> 
Enviado: Sábado 4 de Mayo de 2013 18:32
Asunto: Re: PersistentEphemeralNode recipe
 


I'd really like to see this recipe available. So, please go ahead.

-JZ


On May 4, 2013, at 1:01 AM, Evaristo José Camarero <[email protected]> 
wrote:

Hi there,
>
>
>Some months ago there was a suggestion to implement Curator recipe to maintain 
>an epehemeral node  after session / connection problems 
>(https://github.com/Netflix/curator/pull/201)
>
>
>This is a common use case in our application and I could help to properly test 
>the current implementation proposed by Jordan 
>(https://gist.github.com/4047093), in order to add the recipe to the Apache 
>Curator distribution
>
>
>Something I am missing in the proposed implementation is the ability to change 
>the reported data in the ephemeral node during the ephemeral node life cycle.
>
>
>Let me know if you believe is an interesting idea to introduce this recipe in 
>the Apache Curator distribution.
>
>
>Regards,
>
>
>Evaristo
package org.apache.curator.framework.recipes.nodes;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.BaseClassForTests;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.Timing;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.testng.Assert;
import org.testng.annotations.Test;


public class TestPersistentEphemeralNode extends BaseClassForTests {

	@Test
	public void     testBasics() throws Exception
	{
		PersistentEphemeralNode           pen = null;
		Timing              timing = new Timing();
		CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
		try
		{
			client.start();
			client.create().forPath("/test");
			pen = new PersistentEphemeralNode(client, Mode.EPHEMERAL, "/test/pen", "a".getBytes());
			pen.start();
			Assert.assertTrue(pen.waitForInitialCreate(5, TimeUnit.SECONDS));

			Assert.assertEquals(pen.getActualPath(), "/test/pen");
			Assert.assertEquals(client.getData().forPath("/test/pen"), "a".getBytes());

			final Semaphore     semaphore = new Semaphore(0);
			Watcher watcher = new Watcher() {
				@Override
				public void process(WatchedEvent arg0) {
					semaphore.release();
				}
			};
			client.checkExists().usingWatcher(watcher).forPath("/test/pen");
			pen.replaceAndSetData("b".getBytes());
			Assert.assertTrue(timing.acquireSemaphore(semaphore));
			Assert.assertEquals(pen.getActualPath(), "/test/pen");
			Assert.assertEquals(client.getData().usingWatcher(watcher).forPath("/test/pen"), "b".getBytes());
			pen.replaceAndSetData("c".getBytes());
			Assert.assertTrue(timing.acquireSemaphore(semaphore));
			Assert.assertEquals(pen.getActualPath(), "/test/pen");
			Assert.assertEquals(client.getData().usingWatcher(watcher).forPath("/test/pen"), "c".getBytes());
			pen.close();
			Assert.assertTrue(timing.acquireSemaphore(semaphore));
			Assert.assertTrue(client.checkExists().forPath("/test/pen") == null);
		}
		finally
		{
			pen.close();
			client.close();
		}
	}
	
	
    @Test
    public void     testKilledSession() throws Exception
    {
		PersistentEphemeralNode           pen = null;
		Timing              timing = new Timing();
		CuratorFramework    client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
		CuratorFramework    client1 = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
        try
        {
			client.start();
			client1.start();
			client.create().forPath("/test");
			pen = new PersistentEphemeralNode(client, Mode.EPHEMERAL, "/test/pen", "a".getBytes());
			pen.start();
			Assert.assertTrue(pen.waitForInitialCreate(5, TimeUnit.SECONDS));
			final Semaphore     deletedSemaphore = new Semaphore(0);
			final Semaphore     createdSemaphore = new Semaphore(0);
			final Semaphore     updatedSemaphore = new Semaphore(0);
			Watcher watcher = new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					if ( event.getType() == EventType.NodeDeleted )
					    deletedSemaphore.release();
					else if ( event.getType() == EventType.NodeCreated )
					    createdSemaphore.release();
					else if ( event.getType() == EventType.NodeDataChanged )
					    updatedSemaphore.release();
				}
			};
			client1.checkExists().usingWatcher(watcher).forPath("/test/pen");
            KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
			Assert.assertTrue(timing.acquireSemaphore(deletedSemaphore));
            client1.checkExists().usingWatcher(watcher).forPath("/test/pen");
			Assert.assertTrue(timing.acquireSemaphore(createdSemaphore));
            Assert.assertTrue(client1.checkExists().forPath("/test/pen") != null);
            Assert.assertEquals(client1.getData().forPath("/test/pen"), "a".getBytes());

            pen.replaceData("b".getBytes());
            Assert.assertEquals(client1.getData().forPath("/test/pen"), "a".getBytes());
            client1.checkExists().usingWatcher(watcher).forPath("/test/pen");

            KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
			Assert.assertTrue(timing.acquireSemaphore(deletedSemaphore));
            client1.checkExists().usingWatcher(watcher).forPath("/test/pen");
			Assert.assertTrue(timing.acquireSemaphore(createdSemaphore));
            Assert.assertTrue(client1.checkExists().forPath("/test/pen") != null);
            Assert.assertEquals(client1.getData().forPath("/test/pen"), "b".getBytes());
        }
        finally
        {
        	pen.close();
        	client.close();
        	client1.close();
        }
    }
    
}
package org.apache.curator.framework.recipes.nodes;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateModable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
 * <p>
 *     A persistent ephemeral node is an ephemeral node that attempts to stay present in
 *     ZooKeeper, even through connection and session interruptions.
 * </p>
 *
 * <p>
 *     Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
 * </p>
 */
public class PersistentEphemeralNode implements Closeable
{
    @VisibleForTesting
    volatile CountDownLatch                 initialCreateLatch = new CountDownLatch(1);

    private final Logger                    log = LoggerFactory.getLogger(getClass());
    private final CuratorFramework          client;
    private final EnsurePath                ensurePath;
    private final CreateModable<ACLBackgroundPathAndBytesable<String>>  createMethod;
    private final AtomicReference<String>   nodePath = new AtomicReference<String>(null);
    private final String                    basePath;
    private final Mode                      mode;
    private final Object                    dataLock = new Object();
    private volatile byte[]                 data;
    private final AtomicReference<State>    state = new AtomicReference<State>(State.LATENT);
    private final AtomicBoolean             isSuspended = new AtomicBoolean(false);
    private final BackgroundCallback        updateNodeBackgroundCallback;
    private final BackgroundCallback        createNodeBackgroundCallback;
    private final Watcher                   watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            if ( ( Objects.equal(nodePath.get(), event.getPath()) ) && (event.getType() == EventType.NodeDeleted) )
            {
                createNode();
            }
        }
    };
    private final ConnectionStateListener   listener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            isSuspended.set((newState != ConnectionState.RECONNECTED) && (newState != ConnectionState.CONNECTED));
            if ( newState == ConnectionState.RECONNECTED )
            {
                createNode();
            }
        }
    };
    private final BackgroundCallback        checkExistsCallback = new BackgroundCallback()
    {
        @Override
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
        {
            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
            {
                createNode();
            }
        }
    };

    private enum State
    {
        LATENT,
        STARTED,
        CLOSED
    }

    /**
     * The mode for node creation
     */
    public enum Mode
    {
        /**
         * Same as {@link CreateMode#EPHEMERAL}
         */
        EPHEMERAL()
        {
            @Override
            protected CreateMode getCreateMode(boolean pathIsSet)
            {
                return CreateMode.EPHEMERAL;
            }

            @Override
            protected boolean isProtected()
            {
                return false;
            }
        },

        /**
         * Same as {@link CreateMode#EPHEMERAL} with protection
         */
        PROTECTED_EPHEMERAL()
        {
            @Override
            protected CreateMode getCreateMode(boolean pathIsSet)
            {
                return CreateMode.EPHEMERAL;
            }

            @Override
            protected boolean isProtected()
            {
                return true;
            }
        }
        ;

        protected abstract CreateMode getCreateMode(boolean pathIsSet);

        protected abstract boolean isProtected();
    }

    /**
     * @param client client instance
     * @param mode creation/protection mode
     * @param basePath the base path for the node
     * @param data data for the node
     */
    public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] data)
    {
        this.client = Preconditions.checkNotNull(client, "client cannot be null");
        this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
        this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
        data = Preconditions.checkNotNull(data, "data cannot be null");

        String parentDir = ZKPaths.getPathAndNode(basePath).getPath();
        ensurePath = client.newNamespaceAwareEnsurePath(parentDir);

        updateNodeBackgroundCallback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                String      path = null;
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    path = event.getPath();
                    byte[] bytes = client.getData().forPath(path);
                    if ( backgroundUpdateIfDifferent(bytes, path) )
                    {
                        CountDownLatch localLatch = initialCreateLatch;
                        initialCreateLatch = null;
                        if ( localLatch != null )
                        {
                            localLatch.countDown();
                        }
                    }
                }
            }
        };
        createNodeBackgroundCallback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                String      path = null;
                boolean     updateRequired = false;
                if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() )
                {
                    path = event.getPath();
                    byte[] bytes = client.getData().forPath(path);
                    updateRequired = !backgroundUpdateIfDifferent(bytes, path);
                }
                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    path = event.getName();
                }
        /**
         * If data is changed before the initial create completes, things are not ok until data is updated.
         */
                if ( updateRequired ) 
                {
                    nodePath.set(path);
                    watchNode();
                }
                else if ( path != null )
                {
                    nodePath.set(path);
                    watchNode();

                    CountDownLatch localLatch = initialCreateLatch;
                    initialCreateLatch = null;
                    if ( localLatch != null )
                    {
                        localLatch.countDown();
                    }
                }
                else
                {
                    createNode();
                }
            }
        };

        createMethod = mode.isProtected() ? client.create().withProtection() : client.create();
        synchronized ( dataLock )
        {
            this.data = Arrays.copyOf(data, data.length);
        }
    }

    private boolean backgroundUpdateIfDifferent(byte[] bytes, String path) throws Exception
    {
        synchronized ( dataLock )
        {
            if ( Arrays.equals(bytes, this.data) )
            {
                return true;
            }
            else 
            {
                client.setData().inBackground(updateNodeBackgroundCallback).forPath(path, this.data);
                return false;
            }
        }
    }

    /**
     * You must call start() to initiate the persistent ephemeral node. An attempt to create the node
     * in the background will be started
     */
    public void start()
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");

        client.getConnectionStateListenable().addListener(listener);
        createNode();
    }

    /**
     * Block until either the initial node creation initiated by {@link #start()} succeeds or
     * the timeout elapses.
     *
     * @param timeout the maximum time to wait
     * @param unit time unit
     * @return if the node was created before timeout
     * @throws InterruptedException if the thread is interrupted
     */
    public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
    {
        Preconditions.checkState(state.get() == State.STARTED, "Not started");

        return initialCreateLatch.await(timeout, unit);
    }

    @Override
    public void close()
    {
        if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
        {
            return;
        }

        client.getConnectionStateListenable().removeListener(listener);

        deleteNode();
    }

    /**
     * Returns the currently set path or null if the node does not exist
     *
     * @return node path or null
     */
    public String getActualPath()
    {
        return nodePath.get();
    }
    
    /**
     * Set data that ephemeral node should set in ZK
     * It does not try to write data in ZK 
     * 
     * @param data
     */
    public void replaceData (byte[] data) {
        data = Preconditions.checkNotNull(data, "data cannot be null");
        synchronized ( dataLock )
        {
            this.data = Arrays.copyOf(data, data.length);
        }
    }
    
    /**
     * Set data that ephemeral node should set in ZK
     * Uses async API to write value in ZK if the connection is up 
     * and there isn't a request already ongoing to do the update
     * 
     * @param data
     * @throws Exception
     */
    public void replaceAndSetData (byte[] data) throws Exception {
        data = Preconditions.checkNotNull(data, "data cannot be null");
        synchronized ( dataLock )
        {
            this.data = Arrays.copyOf(data, data.length);
        }
        resetData();
    }
    
    /**
     * Uses async API to write value in ZK if the connection is up 
     * and there isn't a request already ongoing to do the update
     *
     * @throws Exception
     */
    public void resetData() throws Exception {
        if ( isActive() )
        {
            synchronized ( dataLock )
            {
                client.setData().inBackground(updateNodeBackgroundCallback).forPath(basePath, this.data);
            }
        }
    }

    private void deleteNode()
    {
        String          localNodePath = nodePath.getAndSet(null);
        if ( localNodePath != null )
        {
            try
            {
                client.delete().guaranteed().inBackground().forPath(localNodePath);
            }
            catch ( KeeperException.NoNodeException ignore )
            {
                // ignore
            }
            catch ( Exception e )
            {
                log.error("Deleting node: " + localNodePath, e);
            }
        }
    }

    private void createNode()
    {
        if ( !isActive() )
        {
            return;
        }

        try
        {
            String      existingPath = nodePath.get();
            String      createPath = (existingPath != null) ? existingPath : basePath;
            byte[] newData;
            synchronized (dataLock)
            {
                newData = Arrays.copyOf(this.data, data.length);
            }
            ensurePath.ensure(client.getZookeeperClient());
            createMethod.withMode(mode.getCreateMode(existingPath != null)).inBackground(createNodeBackgroundCallback).forPath(createPath, newData);
        }
        catch ( Exception e )
        {
            log.error("Creating node. BasePath: " + basePath, e);
        }
    }

    private void watchNode()
    {
        if ( !isActive() )
        {
            return;
        }

        String          localNodePath = nodePath.get();
        if ( localNodePath != null )
        {
            try
            {
                client.checkExists().usingWatcher(watcher).inBackground(checkExistsCallback).forPath(localNodePath);
            }
            catch ( Exception e )
            {
                log.error("Watching node: " + localNodePath, e);
            }
        }
    }

    private boolean isActive()
    {
        return (state.get() == State.STARTED) && !isSuspended.get();
    }
}

Reply via email to