wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/abf5fdda Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/abf5fdda Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/abf5fdda Branch: refs/heads/curator-rpc Commit: abf5fddae35e62b93256c2e473df8b79b3b76969 Parents: 099df94 Author: randgalt <randg...@apache.org> Authored: Mon May 26 20:29:20 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Mon May 26 20:29:20 2014 -0500 ---------------------------------------------------------------------- .../org/apache/curator/x/rpc/CuratorEntry.java | 13 ----- .../curator/x/rpc/CuratorProjectionServer.java | 3 +- .../org/apache/curator/x/rpc/RpcManager.java | 57 ++++++++++++++++---- 3 files changed, 50 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java index 61f97f3..f1b50a0 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java @@ -6,13 +6,11 @@ import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent; import java.io.Closeable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class CuratorEntry implements Closeable { private final CuratorFramework client; - private final AtomicLong lastAccessEpoch = new AtomicLong(0); private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue(); private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN); @@ -25,7 +23,6 @@ public class CuratorEntry implements Closeable public CuratorEntry(CuratorFramework client) { this.client = client; - updateLastAccess(); } @Override @@ -55,18 +52,8 @@ public class CuratorEntry implements Closeable } } - public void updateLastAccess() - { - lastAccessEpoch.set(System.currentTimeMillis()); - } - public CuratorFramework getClient() { return (state.get() == State.OPEN) ? client : null; } - - public long getLastAccessEpoch() - { - return lastAccessEpoch.get(); - } } http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java index 6f94902..75c304b 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java @@ -26,12 +26,13 @@ import com.facebook.swift.service.ThriftServiceProcessor; import com.google.common.collect.Lists; import org.apache.curator.x.rpc.idl.event.EventService; import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService; +import java.util.concurrent.TimeUnit; public class CuratorProjectionServer { public static void main(String[] args) { - RpcManager rpcManager = new RpcManager(); + RpcManager rpcManager = new RpcManager(TimeUnit.SECONDS.toMillis(10)); // TODO EventService eventService = new EventService(rpcManager, 5000); // TODO CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager); ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService); http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java ---------------------------------------------------------------------- diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java index dcfa2a0..e238aa6 100644 --- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java +++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java @@ -1,27 +1,66 @@ package org.apache.curator.x.rpc; -import com.google.common.collect.Maps; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import org.apache.curator.framework.CuratorFramework; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.util.concurrent.TimeUnit; -public class RpcManager +public class RpcManager implements Closeable { - private final Map<String, CuratorEntry> projections = Maps.newConcurrentMap(); + private final Logger log = LoggerFactory.getLogger(getClass()); + private final Cache<String, CuratorEntry> cache; + + public RpcManager(long expirationMs) + { + RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>() + { + @SuppressWarnings("NullableProblems") + @Override + public void onRemoval(RemovalNotification<String, CuratorEntry> notification) + { + if ( notification != null ) + { + log.debug(String.format("Entry being removed. id (%s), reason (%s)", notification.getKey(), notification.getCause())); + + CuratorEntry entry = notification.getValue(); + if ( entry != null ) + { + entry.close(); + } + } + } + }; + cache = CacheBuilder + .newBuilder() + .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS) + .removalListener(listener) + .build(); + } + + @Override + public void close() + { + cache.invalidateAll(); + cache.cleanUp(); + } public void add(String id, CuratorFramework client) { - projections.put(id, new CuratorEntry(client)); + cache.put(id, new CuratorEntry(client)); } public CuratorEntry get(String id) { - CuratorEntry curatorEntry = projections.get(id); - curatorEntry.updateLastAccess(); - return curatorEntry; + return cache.getIfPresent(id); } public CuratorEntry remove(String id) { - return projections.remove(id); + return cache.asMap().remove(id); } }