wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7c99ddbf Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7c99ddbf Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7c99ddbf Branch: refs/heads/curator-rpc Commit: 7c99ddbf25b456afc990a43098851ce8174c8fdd Parents: 16836aa Author: randgalt <randg...@apache.org> Authored: Mon May 26 15:59:37 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Mon May 26 15:59:37 2014 -0500 ---------------------------------------------------------------------- .../org/apache/curator/x/rpc/RpcManager.java | 59 ++++++++++++++ .../curator/x/rpc/idl/event/EventService.java | 1 - .../projection/CuratorProjectionService.java | 81 ++------------------ 3 files changed, 64 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/7c99ddbf/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java new file mode 100644 index 0000000..783297d --- /dev/null +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java @@ -0,0 +1,59 @@ +package org.apache.curator.x.rpc; + +import com.google.common.collect.Maps; +import org.apache.curator.framework.CuratorFramework; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +public class RpcManager +{ + private final Map<String, CuratorEntry> projections = Maps.newConcurrentMap(); + + private static class CuratorEntry + { + private final CuratorFramework client; + private final AtomicLong lastAccessEpoch = new AtomicLong(0); + + private CuratorEntry(CuratorFramework client) + { + this.client = client; + updateLastAccess(); + } + + void updateLastAccess() + { + lastAccessEpoch.set(System.currentTimeMillis()); + } + } + + public void add(String id, CuratorFramework client) + { + projections.put(id, new CuratorEntry(client)); + } + + public void updateLastAccess(String id) + { + CuratorEntry entry = projections.get(id); + if ( entry != null ) + { + entry.updateLastAccess(); + } + } + + public CuratorFramework getClient(String id) + { + CuratorEntry entry = projections.get(id); + if ( entry != null ) + { + entry.updateLastAccess(); + return entry.client; + } + return null; + } + + public CuratorFramework removeClient(String id) + { + CuratorEntry entry = projections.remove(id); + return (entry != null) ? entry.client : null; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/7c99ddbf/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java index c79fa20..206a347 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java @@ -18,7 +18,6 @@ public class EventService @ThriftMethod public RpcCuratorEvent getNextEvent() throws InterruptedException { - System.out.println(Thread.currentThread() + "getNextEvent"); return events.take(); } } http://git-wip-us.apache.org/repos/asf/curator/blob/7c99ddbf/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java index f6e4757..34eff59 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java @@ -20,7 +20,6 @@ package org.apache.curator.x.rpc.idl.projection; import com.facebook.swift.service.ThriftMethod; import com.facebook.swift.service.ThriftService; -import com.google.common.collect.Maps; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; @@ -29,22 +28,17 @@ import org.apache.curator.framework.api.Compressible; import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.CreateModable; import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.PathAndBytesable; import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.x.rpc.RpcManager; import org.apache.curator.x.rpc.idl.event.EventService; import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import java.util.List; -import java.util.Map; import java.util.UUID; @ThriftService("CuratorService") public class CuratorProjectionService { - private final Map<String, CuratorFramework> projections = Maps.newConcurrentMap(); + private final RpcManager rpcManager = new RpcManager(); private final EventService eventService; public CuratorProjectionService(EventService eventService) @@ -55,82 +49,17 @@ public class CuratorProjectionService @ThriftMethod public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec) // TODO { - System.out.println(Thread.currentThread() + "newCuratorProjection"); - - eventService.addEvent(new RpcCuratorEvent(null, new CuratorEvent() - { - @Override - public CuratorEventType getType() - { - return CuratorEventType.CHILDREN; - } - - @Override - public int getResultCode() - { - return 1; - } - - @Override - public String getPath() - { - return null; - } - - @Override - public Object getContext() - { - return null; - } - - @Override - public Stat getStat() - { - return null; - } - - @Override - public byte[] getData() - { - return new byte[0]; - } - - @Override - public String getName() - { - return null; - } - - @Override - public List<String> getChildren() - { - return null; - } - - @Override - public List<ACL> getACLList() - { - return null; - } - - @Override - public WatchedEvent getWatchedEvent() - { - return null; - } - })); - CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1)); String id = UUID.randomUUID().toString(); client.start(); - projections.put(id, client); + rpcManager.add(id, client); return new CuratorProjection(id); } @ThriftMethod public void closeCuratorProjection(CuratorProjection projection) { - CuratorFramework client = projections.remove(projection.id); + CuratorFramework client = rpcManager.removeClient(projection.id); if ( client != null ) { client.close(); @@ -202,7 +131,7 @@ public class CuratorProjectionService private CuratorFramework getClient(CuratorProjection projection) throws Exception { - CuratorFramework client = projections.get(projection.id); + CuratorFramework client = rpcManager.getClient(projection.id); if ( client == null ) { throw new Exception("No client found with id: " + projection.id);