wip

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ae3d13b4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ae3d13b4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ae3d13b4

Branch: refs/heads/curator-rpc
Commit: ae3d13b4af32bc9a21889b6f68e762383c547772
Parents: 41f254a
Author: randgalt <randg...@apache.org>
Authored: Tue May 27 11:51:34 2014 -0500
Committer: randgalt <randg...@apache.org>
Committed: Tue May 27 11:51:45 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/x/rpc/CuratorEntry.java  | 68 ++++++++++++++++++++
 .../projection/CuratorProjectionService.java    | 51 ++++++++++++++-
 curator-x-rpc/src/main/thrift/curator.thrift    |  6 ++
 3 files changed, 122 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ae3d13b4/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
----------------------------------------------------------------------
diff --git 
a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java 
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
index f1b50a0..bd6f207 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
@@ -1,18 +1,37 @@
 package org.apache.curator.x.rpc;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class CuratorEntry implements Closeable
 {
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final BlockingQueue<RpcCuratorEvent> events = 
Queues.newLinkedBlockingQueue();
     private final AtomicReference<State> state = new 
AtomicReference<State>(State.OPEN);
+    private final Map<String, Entry> things = Maps.newConcurrentMap();
+
+    private static class Entry
+    {
+        final Object thing;
+        final Closer closer;
+
+        private Entry(Object thing, Closer closer)
+        {
+            this.thing = thing;
+            this.closer = closer;
+        }
+    }
 
     private enum State
     {
@@ -30,6 +49,18 @@ public class CuratorEntry implements Closeable
     {
         if ( state.compareAndSet(State.OPEN, State.CLOSED) )
         {
+            for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
+            {
+                Entry entry = mapEntry.getValue();
+                if ( entry.closer != null )
+                {
+                    log.debug(String.format("Closing left over thing. Type: %s 
- Id: %s", entry.thing.getClass(), mapEntry.getKey()));
+                    //noinspection unchecked
+                    entry.closer.close(entry.thing);    // lack of generics is 
safe because addThing() is type-safe
+                }
+            }
+            things.clear();
+
             client.close();
             events.clear();
         }
@@ -56,4 +87,41 @@ public class CuratorEntry implements Closeable
     {
         return (state.get() == State.OPEN) ? client : null;
     }
+
+    public <T> String addThing(T thing, Closer<T> closer)
+    {
+        return addThing(UUID.randomUUID().toString(), thing, closer);
+    }
+
+    public <T> String addThing(String id, T thing, Closer<T> closer)
+    {
+        things.put(id, new Entry(thing, closer));
+        return id;
+    }
+
+    public <T> T getThing(String id, Class<T> clazz)
+    {
+        Entry entry = things.get(id);
+        return cast(clazz, entry);
+    }
+
+    public boolean closeThing(String id)
+    {
+        Entry entry = things.remove(id);
+        if ( entry != null )
+        {
+            //noinspection unchecked
+            entry.closer.close(entry.thing);
+        }
+        return false;
+    }
+
+    private <T> T cast(Class<T> clazz, Entry entry)
+    {
+        if ( entry != null )
+        {
+            return clazz.cast(entry.thing);
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/ae3d13b4/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git 
a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
 
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index 4e8dca8..708dc32 100644
--- 
a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ 
b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -30,17 +30,23 @@ import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CreateModable;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.x.rpc.Closer;
 import org.apache.curator.x.rpc.CuratorEntry;
 import org.apache.curator.x.rpc.RpcManager;
 import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 @ThriftService("CuratorService")
 public class CuratorProjectionService
 {
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private final RpcManager rpcManager;
 
     public CuratorProjectionService(RpcManager rpcManager)
@@ -83,7 +89,7 @@ public class CuratorProjectionService
     @ThriftMethod
     public String create(final CuratorProjection projection, CreateSpec 
createSpec) throws Exception
     {
-        CuratorFramework client = getClient(projection);
+        CuratorFramework client = getEntry(projection).getClient();
 
         Object builder = client.create();
         if ( createSpec.creatingParentsIfNeeded )
@@ -119,6 +125,45 @@ public class CuratorProjectionService
         return String.valueOf(castBuilder(builder, 
PathAndBytesable.class).forPath(createSpec.path, createSpec.data));
     }
 
+    @ThriftMethod
+    public boolean closeGenericProjection(CuratorProjection curatorProjection, 
GenericProjection genericProjection) throws Exception
+    {
+        CuratorEntry entry = getEntry(curatorProjection);
+        return entry.closeThing(genericProjection.id);
+    }
+
+    @ThriftMethod
+    public GenericProjection acquireLock(CuratorProjection projection, final 
String path, int maxWaitMs) throws Exception
+    {
+        CuratorEntry entry = getEntry(projection);
+        InterProcessSemaphoreMutex lock = new 
InterProcessSemaphoreMutex(entry.getClient(), path);
+        if ( !lock.acquire(maxWaitMs, TimeUnit.MILLISECONDS) )
+        {
+            return null;    // TODO
+        }
+
+        Closer<InterProcessSemaphoreMutex> closer = new 
Closer<InterProcessSemaphoreMutex>()
+        {
+            @Override
+            public void close(InterProcessSemaphoreMutex mutex)
+            {
+                if ( mutex.isAcquiredInThisProcess() )
+                {
+                    try
+                    {
+                        mutex.release();
+                    }
+                    catch ( Exception e )
+                    {
+                        log.error("Could not release left-over lock for path: 
" + path, e);
+                    }
+                }
+            }
+        };
+        String id = entry.addThing(lock, closer);
+        return new GenericProjection(id);
+    }
+
     private void addEvent(CuratorProjection projection, RpcCuratorEvent event)
     {
         CuratorEntry entry = rpcManager.get(projection.id);
@@ -155,14 +200,14 @@ public class CuratorProjectionService
         throw new UnsupportedOperationException("Bad mode: " + 
mode.toString());
     }
 
-    private CuratorFramework getClient(CuratorProjection projection) throws 
Exception
+    private CuratorEntry getEntry(CuratorProjection projection) throws 
Exception
     {
         CuratorEntry entry = rpcManager.get(projection.id);
         if ( entry == null )
         {
             throw new Exception("No client found with id: " + projection.id);
         }
-        return entry.getClient();
+        return entry;
     }
 
     private static <T> T castBuilder(Object createBuilder, Class<T> clazz) 
throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/ae3d13b4/curator-x-rpc/src/main/thrift/curator.thrift
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/thrift/curator.thrift 
b/curator-x-rpc/src/main/thrift/curator.thrift
index 4b09efe..b03cc66 100644
--- a/curator-x-rpc/src/main/thrift/curator.thrift
+++ b/curator-x-rpc/src/main/thrift/curator.thrift
@@ -36,6 +36,10 @@ struct CuratorProjection {
 struct CuratorProjectionSpec {
 }
 
+struct GenericProjection {
+  1: string id;
+}
+
 struct id {
   1: string scheme;
   2: string id;
@@ -80,7 +84,9 @@ struct CuratorEvent {
 }
 
 service CuratorService {
+  GenericProjection acquireLock(1: CuratorProjection projection, 2: string 
path, 3: i32 maxWaitMs);
   void closeCuratorProjection(1: CuratorProjection projection);
+  bool closeGenericProjection(1: CuratorProjection curatorProjection, 2: 
GenericProjection genericProjection);
   string create(1: CuratorProjection projection, 2: CreateSpec createSpec);
   CuratorProjection newCuratorProjection(1: CuratorProjectionSpec spec);
 }

Reply via email to