wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ae3d13b4 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ae3d13b4 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ae3d13b4 Branch: refs/heads/curator-rpc Commit: ae3d13b4af32bc9a21889b6f68e762383c547772 Parents: 41f254a Author: randgalt <randg...@apache.org> Authored: Tue May 27 11:51:34 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Tue May 27 11:51:45 2014 -0500 ---------------------------------------------------------------------- .../org/apache/curator/x/rpc/CuratorEntry.java | 68 ++++++++++++++++++++ .../projection/CuratorProjectionService.java | 51 ++++++++++++++- curator-x-rpc/src/main/thrift/curator.thrift | 6 ++ 3 files changed, 122 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/ae3d13b4/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java index f1b50a0..bd6f207 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java @@ -1,18 +1,37 @@ package org.apache.curator.x.rpc; +import com.google.common.collect.Maps; import com.google.common.collect.Queues; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class CuratorEntry implements Closeable { + private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFramework client; private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue(); private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN); + private final Map<String, Entry> things = Maps.newConcurrentMap(); + + private static class Entry + { + final Object thing; + final Closer closer; + + private Entry(Object thing, Closer closer) + { + this.thing = thing; + this.closer = closer; + } + } private enum State { @@ -30,6 +49,18 @@ public class CuratorEntry implements Closeable { if ( state.compareAndSet(State.OPEN, State.CLOSED) ) { + for ( Map.Entry<String, Entry> mapEntry : things.entrySet() ) + { + Entry entry = mapEntry.getValue(); + if ( entry.closer != null ) + { + log.debug(String.format("Closing left over thing. Type: %s - Id: %s", entry.thing.getClass(), mapEntry.getKey())); + //noinspection unchecked + entry.closer.close(entry.thing); // lack of generics is safe because addThing() is type-safe + } + } + things.clear(); + client.close(); events.clear(); } @@ -56,4 +87,41 @@ public class CuratorEntry implements Closeable { return (state.get() == State.OPEN) ? client : null; } + + public <T> String addThing(T thing, Closer<T> closer) + { + return addThing(UUID.randomUUID().toString(), thing, closer); + } + + public <T> String addThing(String id, T thing, Closer<T> closer) + { + things.put(id, new Entry(thing, closer)); + return id; + } + + public <T> T getThing(String id, Class<T> clazz) + { + Entry entry = things.get(id); + return cast(clazz, entry); + } + + public boolean closeThing(String id) + { + Entry entry = things.remove(id); + if ( entry != null ) + { + //noinspection unchecked + entry.closer.close(entry.thing); + } + return false; + } + + private <T> T cast(Class<T> clazz, Entry entry) + { + if ( entry != null ) + { + return clazz.cast(entry.thing); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/curator/blob/ae3d13b4/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java index 4e8dca8..708dc32 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java @@ -30,17 +30,23 @@ import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.CreateModable; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.x.rpc.Closer; import org.apache.curator.x.rpc.CuratorEntry; import org.apache.curator.x.rpc.RpcManager; import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.UUID; +import java.util.concurrent.TimeUnit; @ThriftService("CuratorService") public class CuratorProjectionService { + private final Logger log = LoggerFactory.getLogger(getClass()); private final RpcManager rpcManager; public CuratorProjectionService(RpcManager rpcManager) @@ -83,7 +89,7 @@ public class CuratorProjectionService @ThriftMethod public String create(final CuratorProjection projection, CreateSpec createSpec) throws Exception { - CuratorFramework client = getClient(projection); + CuratorFramework client = getEntry(projection).getClient(); Object builder = client.create(); if ( createSpec.creatingParentsIfNeeded ) @@ -119,6 +125,45 @@ public class CuratorProjectionService return String.valueOf(castBuilder(builder, PathAndBytesable.class).forPath(createSpec.path, createSpec.data)); } + @ThriftMethod + public boolean closeGenericProjection(CuratorProjection curatorProjection, GenericProjection genericProjection) throws Exception + { + CuratorEntry entry = getEntry(curatorProjection); + return entry.closeThing(genericProjection.id); + } + + @ThriftMethod + public GenericProjection acquireLock(CuratorProjection projection, final String path, int maxWaitMs) throws Exception + { + CuratorEntry entry = getEntry(projection); + InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(entry.getClient(), path); + if ( !lock.acquire(maxWaitMs, TimeUnit.MILLISECONDS) ) + { + return null; // TODO + } + + Closer<InterProcessSemaphoreMutex> closer = new Closer<InterProcessSemaphoreMutex>() + { + @Override + public void close(InterProcessSemaphoreMutex mutex) + { + if ( mutex.isAcquiredInThisProcess() ) + { + try + { + mutex.release(); + } + catch ( Exception e ) + { + log.error("Could not release left-over lock for path: " + path, e); + } + } + } + }; + String id = entry.addThing(lock, closer); + return new GenericProjection(id); + } + private void addEvent(CuratorProjection projection, RpcCuratorEvent event) { CuratorEntry entry = rpcManager.get(projection.id); @@ -155,14 +200,14 @@ public class CuratorProjectionService throw new UnsupportedOperationException("Bad mode: " + mode.toString()); } - private CuratorFramework getClient(CuratorProjection projection) throws Exception + private CuratorEntry getEntry(CuratorProjection projection) throws Exception { CuratorEntry entry = rpcManager.get(projection.id); if ( entry == null ) { throw new Exception("No client found with id: " + projection.id); } - return entry.getClient(); + return entry; } private static <T> T castBuilder(Object createBuilder, Class<T> clazz) throws Exception http://git-wip-us.apache.org/repos/asf/curator/blob/ae3d13b4/curator-x-rpc/src/main/thrift/curator.thrift ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift index 4b09efe..b03cc66 100644 --- a/curator-x-rpc/src/main/thrift/curator.thrift +++ b/curator-x-rpc/src/main/thrift/curator.thrift @@ -36,6 +36,10 @@ struct CuratorProjection { struct CuratorProjectionSpec { } +struct GenericProjection { + 1: string id; +} + struct id { 1: string scheme; 2: string id; @@ -80,7 +84,9 @@ struct CuratorEvent { } service CuratorService { + GenericProjection acquireLock(1: CuratorProjection projection, 2: string path, 3: i32 maxWaitMs); void closeCuratorProjection(1: CuratorProjection projection); + bool closeGenericProjection(1: CuratorProjection curatorProjection, 2: GenericProjection genericProjection); string create(1: CuratorProjection projection, 2: CreateSpec createSpec); CuratorProjection newCuratorProjection(1: CuratorProjectionSpec spec); }