Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1913 98ad2fd88 -> 40dc50db3
TINKERPOP-1913 Made status attributes available to the ResultSet Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/81fd8403 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/81fd8403 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/81fd8403 Branch: refs/heads/TINKERPOP-1913 Commit: 81fd8403b77a440b99a8757d154e52f7aa63e297 Parents: 98ad2fd Author: Stephen Mallette <sp...@genoprime.com> Authored: Wed Mar 7 10:29:11 2018 -0500 Committer: Stephen Mallette <sp...@genoprime.com> Committed: Wed Mar 7 10:29:11 2018 -0500 ---------------------------------------------------------------------- .../tinkerpop/gremlin/driver/Handler.java | 13 ++++- .../tinkerpop/gremlin/driver/ResultQueue.java | 2 + .../tinkerpop/gremlin/driver/ResultSet.java | 12 ++++ .../tinkerpop/gremlin/driver/ResultSetTest.java | 25 +++++++++ .../gremlin/server/op/AbstractOpProcessor.java | 58 ++++++++++++++++++-- .../op/traversal/TraversalOpProcessor.java | 10 +++- .../server/GremlinResultSetIntegrateTest.java | 9 +++ 7 files changed, 121 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/81fd8403/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java index c6c1022..12c6866 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java @@ -248,9 +248,18 @@ final class Handler { } } - // as this is a non-PARTIAL_CONTENT code - the stream is done - if (response.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) + // the last message in a OK stream could have meta-data that is useful to the result. note that error + // handling of the status attributes is handled separately above + if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.NO_CONTENT) { + // in 3.4.0 this should get refactored. i think the that the markComplete() could just take the + // status attributes as its argument - need to investigate that further + queue.statusAttributes = response.getStatus().getAttributes(); + } + + // as this is a non-PARTIAL_CONTENT code - the stream is done. + if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) { pending.remove(response.getRequestId()).markComplete(); + } } finally { // in the event of an exception above the exception is tossed and handled by whatever channelpipeline // error handling is at play. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/81fd8403/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java index e21e265..7340763 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java @@ -56,6 +56,8 @@ final class ResultQueue { private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>(); + Map<String,Object> statusAttributes = null; + public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) { this.resultLinkedBlockingQueue = resultLinkedBlockingQueue; this.readComplete = readComplete; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/81fd8403/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java index f82876c..f608f06 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java @@ -21,8 +21,10 @@ package org.apache.tinkerpop.gremlin.driver; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Spliterator; import java.util.Spliterators; @@ -74,6 +76,16 @@ public final class ResultSet implements Iterable<Result> { } /** + * Returns a future that will complete when {@link #allItemsAvailable()} is {@code true} and will contain the + * attributes from the response. + */ + public CompletableFuture<Map<String,Object>> statusAttributes() { + final CompletableFuture<Map<String,Object>> attrs = new CompletableFuture<>(); + readCompleted.thenRun(() -> attrs.complete(null == resultQueue.statusAttributes ? Collections.emptyMap() : resultQueue.statusAttributes)); + return attrs; + } + + /** * Determines if all items have been returned to the client. */ public boolean allItemsAvailable() { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/81fd8403/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java index 0cf4fb5..3163ffe 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java @@ -22,8 +22,10 @@ import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.junit.Before; import org.junit.Test; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -49,6 +51,29 @@ public class ResultSetTest extends AbstractResultQueueTest { } @Test + public void shouldReturnResponseAttributes() throws Exception { + resultQueue.statusAttributes = new HashMap<String,Object>() {{ + put("test",123); + put("junk","here"); + }}; + + final CompletableFuture<Map<String,Object>> attrs = resultSet.statusAttributes(); + readCompleted.complete(null); + + final Map<String,Object> m = attrs.get(); + assertEquals(123, m.get("test")); + assertEquals("here", m.get("junk")); + assertEquals(2, m.size()); + } + + @Test + public void shouldReturnEmptyMapForNoResponseAttributes() throws Exception { + final CompletableFuture<Map<String,Object>> attrs = resultSet.statusAttributes(); + readCompleted.complete(null); + assertThat(attrs.get().isEmpty(), is(true)); + } + + @Test public void shouldHaveAllItemsAvailableAsynchronouslyOnReadComplete() { final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync(); assertThat(all.isDone(), is(false)); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/81fd8403/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java index 8899bb5..39fb9b1 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -143,7 +144,9 @@ public abstract class AbstractOpProcessor implements OpProcessor { // thread that processed the eval of the script so, we have to push serialization down into that Frame frame = null; try { - frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty)); + frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code, + generateResultMetaData(ctx, msg, code, itty, settings), + generateStatusAttributes(ctx, msg, code, itty, settings)); } catch (Exception ex) { // a frame may use a Bytebuf which is a countable release - if it does not get written // downstream it needs to be released here @@ -239,17 +242,49 @@ public abstract class AbstractOpProcessor implements OpProcessor { } /** - * Generates meta-data to put on a {@link ResponseMessage}. + * Generates response result meta-data to put on a {@link ResponseMessage}. * * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in * this method + * @deprecated As of release 3.3.2, replaced by {@link #generateResultMetaData(ChannelHandlerContext, RequestMessage, ResponseStatusCode, Iterator, Settings)} */ - protected Map<String,Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg, - final ResponseStatusCode code, final Iterator itty) { + @Deprecated + protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg, + final ResponseStatusCode code, final Iterator itty) { return Collections.emptyMap(); } /** + * Generates response result meta-data to put on a {@link ResponseMessage}. + * + * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in + * this method + */ + protected Map<String, Object> generateResultMetaData(final ChannelHandlerContext ctx, final RequestMessage msg, + final ResponseStatusCode code, final Iterator itty, + final Settings settings) { + return generateMetaData(ctx, msg, code, itty); + } + + /** + * Generates response status meta-data to put on a {@link ResponseMessage}. + * + * @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in + * this method + */ + protected Map<String, Object> generateStatusAttributes(final ChannelHandlerContext ctx, final RequestMessage msg, + final ResponseStatusCode code, final Iterator itty, + final Settings settings) { + // only return server metadata on the last message + if (itty.hasNext()) return Collections.emptyMap(); + + final Map<String, Object> metaData = new HashMap<>(); + metaData.put(Tokens.ARGS_HOST, ctx.channel().remoteAddress().toString()); + + return metaData; + } + + /** * @deprecated As of release 3.2.2, replaced by {@link #makeFrame(ChannelHandlerContext, RequestMessage, MessageSerializer, boolean, List, ResponseStatusCode, Map)}. */ protected static Frame makeFrame(final ChannelHandlerContext ctx, final RequestMessage msg, @@ -258,13 +293,25 @@ public abstract class AbstractOpProcessor implements OpProcessor { return makeFrame(ctx, msg, serializer, useBinary, aggregate, code, Collections.emptyMap()); } + /** + * @deprecated As of release 3.3.2, replaced by {@link #makeFrame(ChannelHandlerContext, RequestMessage, MessageSerializer, boolean, List, ResponseStatusCode, Map, Map)}. + */ protected static Frame makeFrame(final ChannelHandlerContext ctx, final RequestMessage msg, final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate, - final ResponseStatusCode code, final Map<String,Object> responseMetaData) throws Exception { + final ResponseStatusCode code, final Map<String,Object> statusAttributes) throws Exception { + return makeFrame(ctx, msg, serializer, useBinary, aggregate, code, statusAttributes, Collections.emptyMap()); + } + + + protected static Frame makeFrame(final ChannelHandlerContext ctx, final RequestMessage msg, + final MessageSerializer serializer, final boolean useBinary, final List<Object> aggregate, + final ResponseStatusCode code, final Map<String,Object> responseMetaData, + final Map<String,Object> statusAttributes) throws Exception { try { if (useBinary) { return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg) .code(code) + .statusAttributes(statusAttributes) .responseMetaData(responseMetaData) .result(aggregate).create(), ctx.alloc())); } else { @@ -273,6 +320,7 @@ public abstract class AbstractOpProcessor implements OpProcessor { final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer; return new Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg) .code(code) + .statusAttributes(statusAttributes) .responseMetaData(responseMetaData) .result(aggregate).create())); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/81fd8403/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java index 9025108..5e092da 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java @@ -458,6 +458,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor { @Override protected Map<String, Object> generateMetaData(final ChannelHandlerContext ctx, final RequestMessage msg, final ResponseStatusCode code, final Iterator itty) { + // leaving this overriding the deprecated version of this method because it provides a decent test to those + // who might have their own OpProcessor implementations that apply meta-data. leaving this alone helps validate + // that the upgrade path is clean. we can remove this in 3.4.0 Map<String, Object> metaData = Collections.emptyMap(); if (itty instanceof SideEffectIterator) { final SideEffectIterator traversalIterator = (SideEffectIterator) itty; @@ -467,6 +470,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor { metaData.put(Tokens.ARGS_SIDE_EFFECT_KEY, key); metaData.put(Tokens.ARGS_AGGREGATE_TO, traversalIterator.getSideEffectAggregator()); } + } else { + // this is a standard traversal iterator + metaData = super.generateMetaData(ctx, msg, code, itty); } return metaData; @@ -539,7 +545,9 @@ public class TraversalOpProcessor extends AbstractOpProcessor { // thread that processed the eval of the script so, we have to push serialization down into that Frame frame = null; try { - frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code, generateMetaData(ctx, msg, code, itty)); + frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code, + generateResultMetaData(ctx, msg, code, itty, settings), + generateStatusAttributes(ctx, msg, code, itty, settings)); } catch (Exception ex) { // a frame may use a Bytebuf which is a countable release - if it does not get written // downstream it needs to be released here http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/81fd8403/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java index 01c5ee3..ac8b531 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinResultSetIntegrateTest.java @@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.driver.MessageSerializer; import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultSet; +import org.apache.tinkerpop.gremlin.driver.Tokens; import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin; @@ -53,6 +54,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.is; @@ -96,6 +98,13 @@ public class GremlinResultSetIntegrateTest extends AbstractGremlinServerIntegrat } @Test + public void shouldReturnResponseAttributes() throws Exception { + final ResultSet results = client.submit("g.V()"); + final Map<String,Object> attr = results.statusAttributes().get(20000, TimeUnit.MILLISECONDS); + assertThat(attr.containsKey(Tokens.ARGS_HOST), is(true)); + } + + @Test public void shouldHandleVertexResultFromTraversalBulked() throws Exception { final Graph graph = TinkerGraph.open(); final GraphTraversalSource g = graph.traversal();