Repository: curator Updated Branches: refs/heads/CURATOR-88 a3cdddc1b -> 8f6edd706
refactoring Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/8f6edd70 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/8f6edd70 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/8f6edd70 Branch: refs/heads/CURATOR-88 Commit: 8f6edd70687710847ff475a442f4444da47b8112 Parents: a3cdddc Author: randgalt <randg...@apache.org> Authored: Sat Feb 15 19:21:57 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Sat Feb 15 19:21:57 2014 -0500 ---------------------------------------------------------------------- .../curator/x/rest/CuratorRestContext.java | 19 ------------------ .../curator/x/rest/api/ClientResource.java | 4 +--- .../apache/curator/x/rest/api/Constants.java | 1 + .../curator/x/rest/api/LeaderResource.java | 4 ++-- .../curator/x/rest/api/NodeCacheResource.java | 2 +- .../x/rest/api/PathChildrenCacheResource.java | 2 +- .../x/rest/api/RestBackgroundCallback.java | 2 +- .../apache/curator/x/rest/api/RestWatcher.java | 2 +- .../org/apache/curator/x/rest/api/Session.java | 21 ++++++++++++++++++++ 9 files changed, 29 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java index 95d50fb..cca2e89 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java @@ -19,22 +19,16 @@ package org.apache.curator.x.rest; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.x.rest.api.Session; -import org.apache.curator.x.rest.entities.StatusMessage; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -60,7 +54,6 @@ public class CuratorRestContext implements Closeable } } }; - private final BlockingQueue<StatusMessage> messages = Queues.newLinkedBlockingQueue(); private enum State { @@ -105,18 +98,6 @@ public class CuratorRestContext implements Closeable executorService.scheduleAtFixedRate(runner, sessionLengthMs, sessionLengthMs, TimeUnit.MILLISECONDS); } - public void pushMessage(StatusMessage message) - { - messages.add(message); - } - - public Collection<StatusMessage> drainMessages() - { - List<StatusMessage> localMessages = Lists.newArrayList(); - messages.drainTo(localMessages); - return localMessages; - } - private void checkSession() { long elapsedSinceLastUse = System.currentTimeMillis() - session.getLastUseMs(); http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java index 6f8a02f..94009f0 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java @@ -57,11 +57,9 @@ public class ClientResource @Path("/status") public Response getStatus() throws IOException { - context.getSession(); // update last use - ObjectNode node = context.getMapper().createObjectNode(); node.put("state", context.getClient().getState().name()); - node.putPOJO("messages", context.drainMessages()); + node.putPOJO("messages", context.getSession().drainMessages()); return Response.ok(context.getWriter().writeValueAsString(node)).build(); } http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java index 2696c12..b12d1b1 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java @@ -38,6 +38,7 @@ class Constants static final String PATH_CACHE = "path-cache"; static final String NODE_CACHE = "node-cache"; static final String LEADER = "leader"; + static final String CLOSING = "closing"; static ObjectNode makeIdNode(CuratorRestContext context, String id) { http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/LeaderResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/LeaderResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/LeaderResource.java index d5b0cab..e9c7ec4 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/LeaderResource.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/LeaderResource.java @@ -78,13 +78,13 @@ public class LeaderResource @Override public void isLeader() { - context.pushMessage(new StatusMessage(Constants.LEADER, id, "true", "")); + context.getSession().pushMessage(new StatusMessage(Constants.LEADER, id, "true", "")); } @Override public void notLeader() { - context.pushMessage(new StatusMessage(Constants.LEADER, id, "false", "")); + context.getSession().pushMessage(new StatusMessage(Constants.LEADER, id, "false", "")); } }; leaderLatch.addListener(listener); http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/NodeCacheResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/NodeCacheResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/NodeCacheResource.java index 74094b6..4548759 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/NodeCacheResource.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/NodeCacheResource.java @@ -79,7 +79,7 @@ public class NodeCacheResource @Override public void nodeChanged() throws Exception { - context.pushMessage(new StatusMessage(Constants.NODE_CACHE, id, "", "")); + context.getSession().pushMessage(new StatusMessage(Constants.NODE_CACHE, id, "", "")); } }; cache.getListenable().addListener(listener); http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java index 6245d56..76fae7d 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java @@ -84,7 +84,7 @@ public class PathChildrenCacheResource @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - context.pushMessage(new StatusMessage(Constants.PATH_CACHE, id, event.getType().name(), "")); + context.getSession().pushMessage(new StatusMessage(Constants.PATH_CACHE, id, event.getType().name(), "")); } }; cache.getListenable().addListener(listener); http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestBackgroundCallback.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestBackgroundCallback.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestBackgroundCallback.java index 5b65b30..482576b 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestBackgroundCallback.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestBackgroundCallback.java @@ -40,7 +40,7 @@ class RestBackgroundCallback implements BackgroundCallback @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - context.pushMessage(new StatusMessage(type, asyncId, getMessage(event), Integer.toString(event.getResultCode()))); + context.getSession().pushMessage(new StatusMessage(type, asyncId, getMessage(event), Integer.toString(event.getResultCode()))); } protected String getMessage(CuratorEvent event) http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestWatcher.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestWatcher.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestWatcher.java index 29a34db..024e502 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestWatcher.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/RestWatcher.java @@ -39,7 +39,7 @@ class RestWatcher implements Watcher { if ( event.getType() != Event.EventType.None ) { - context.pushMessage(new StatusMessage(Constants.WATCH, watchId, event.getType().name(), String.valueOf(event.getPath()))); + context.getSession().pushMessage(new StatusMessage(Constants.WATCH, watchId, event.getType().name(), String.valueOf(event.getPath()))); } } } http://git-wip-us.apache.org/repos/asf/curator/blob/8f6edd70/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java index 8671383..2d2cd8a 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java @@ -18,11 +18,17 @@ */ package org.apache.curator.x.rest.api; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import org.apache.curator.x.rest.entities.StatusMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; public class Session implements Closeable @@ -30,6 +36,7 @@ public class Session implements Closeable private final Logger log = LoggerFactory.getLogger(getClass()); private final Map<String, Entry> things = Maps.newConcurrentMap(); private final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis()); + private final BlockingQueue<StatusMessage> messages = Queues.newLinkedBlockingQueue(); private static class Entry { @@ -61,6 +68,8 @@ public class Session implements Closeable public void closeThings() { + pushMessage(new StatusMessage(Constants.CLOSING, "", "", "")); + for ( Map.Entry<String, Entry> mapEntry : things.entrySet() ) { Entry entry = mapEntry.getValue(); @@ -73,6 +82,18 @@ public class Session implements Closeable } } + void pushMessage(StatusMessage message) + { + messages.add(message); + } + + Collection<StatusMessage> drainMessages() + { + List<StatusMessage> localMessages = Lists.newArrayList(); + messages.drainTo(localMessages); + return localMessages; + } + <T> String addThing(T thing, Closer<T> closer) { String id = Constants.newId();