This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit ad32d33b001b9e99db33d9a93c075aa0e0d703c6 Author: Ken Hu <[email protected]> AuthorDate: Fri Mar 20 09:42:37 2026 -0700 Merge ResultQueue into ResultSet to simplify result handling The ResultQueue class was an unnecessary indirection layer between Netty and the ResultSet. This was probably more useful when the protocol was WebSocket-based and there were multiple requests on the same connection but it doesn't seem to have much of a purpose anymore. This new setup also potentially works better for HTTP/2 if the high level, stream-based API is used. If the frame-based API is used then changes might be required in the future. --- CHANGELOG.asciidoc | 1 + .../tinkerpop/gremlin/driver/Channelizer.java | 2 +- .../tinkerpop/gremlin/driver/Connection.java | 29 ++--- .../tinkerpop/gremlin/driver/ResultQueue.java | 139 --------------------- .../apache/tinkerpop/gremlin/driver/ResultSet.java | 120 ++++++++++++++++-- .../driver/handler/GremlinResponseHandler.java | 35 +++--- ...ltQueueTest.java => AbstractResultSetTest.java} | 33 +++-- ...ultQueueTest.java => ResultSetQueuingTest.java} | 126 +++++++++---------- .../tinkerpop/gremlin/driver/ResultSetTest.java | 37 +++--- 9 files changed, 232 insertions(+), 290 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6846e4d9a3..d02b6333cd 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Added `RequestInterceptor` to `gremlin-go` with `auth` reference implementations to replace `authInfo`. * Refactored GraphBinary serializers to use `io.Writer` and `io.Reader` instead of `*bytes.Buffer` for streaming capacities. * Refactored `httpProtocol` and `httpTransport` in `gremlin-go` into single `connection.go` that handles HTTP request and response. +* Refactored result handling in `gremlin-driver` by merging `ResultQueue` into `ResultSet`. * Replace `Bytecode` with `GremlinLang` in `gremlin-dotnet`. * Replace `WebSocket` with `HTTP` (non-streaming) in `gremlin-dotnet`. * Added `MimeType` to `IMessageSerializer` and split client option to allow separate request and response serialization in `gremlin-dotnet`. diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index fd7d585baf..427f76c056 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -92,7 +92,7 @@ public interface Channelizer extends ChannelHandler { protected Connection connection; protected Cluster cluster; protected SslHandler sslHandler; - private AtomicReference<ResultQueue> pending; + private AtomicReference<ResultSet> pending; protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler"; protected static final String PIPELINE_SSL_HANDLER = "gremlin-ssl-handler"; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 5f9c1325d3..cb94b39b87 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -32,7 +32,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import java.net.URI; import java.time.Instant; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -56,7 +55,7 @@ final class Connection { private final Channel channel; private final URI uri; - private final AtomicReference<ResultQueue> pending = new AtomicReference<>(); + private final AtomicReference<ResultSet> pending = new AtomicReference<>(); private final Cluster cluster; private final Client client; private final ConnectionPool pool; @@ -155,7 +154,7 @@ final class Connection { return cluster; } - AtomicReference<ResultQueue> getPending() { + AtomicReference<ResultSet> getPending() { return pending; } @@ -184,9 +183,9 @@ final class Connection { return future; } - public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) { - // once there is a completed write, then create a traverser for the result set and complete - // the promise so that the client knows that that it can start checking for results. + public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultSetFuture) { + // once there is a completed write, then create a ResultSet and complete + // the promise so that the client knows that it can start checking for results. final Connection thisConnection = this; final ChannelPromise requestPromise = channel.newPromise() @@ -198,12 +197,11 @@ final class Connection { handleConnectionCleanupOnError(thisConnection); - cluster.executor().submit(() -> resultQueueSetup.completeExceptionally(f.cause())); + cluster.executor().submit(() -> resultSetFuture.completeExceptionally(f.cause())); } else { - final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>(); - final CompletableFuture<Void> readCompleted = new CompletableFuture<>(); + final ResultSet resultSet = new ResultSet(cluster.executor(), requestMessage, pool.host); - readCompleted.whenCompleteAsync((v, t) -> { + resultSet.getReadCompleted().whenCompleteAsync((v, t) -> { if (t != null) { // the callback for when the read failed. a failed read means the request went to the server // and came back with a server-side error of some sort. it means the server is responsive @@ -212,7 +210,7 @@ final class Connection { logger.debug("Error while processing request on the server {}.", this, t); handleConnectionCleanupOnError(thisConnection); } else { - // the callback for when the read was successful, meaning that ResultQueue.markComplete() + // the callback for when the read was successful, meaning that ResultSet.markComplete() // was called thisConnection.returnToPool(); } @@ -223,15 +221,12 @@ final class Connection { tryShutdown(); }, cluster.executor()); - final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted); - // pending.put(requestMessage.getRequestId(), handler); - pending.set(handler); + pending.set(resultSet); - // resultQueueSetup should only be completed by a worker since the application code might have sync + // resultSetFuture should only be completed by a worker since the application code might have sync // completion stages attached to it which and we do not want the event loop threads to process those // stages. - cluster.executor().submit(() -> resultQueueSetup.complete( - new ResultSet(handler, cluster.executor(), readCompleted, requestMessage, pool.host))); + cluster.executor().submit(() -> resultSetFuture.complete(resultSet)); } }); channel.writeAndFlush(requestMessage, requestPromise); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java deleted file mode 100644 index e91e833b94..0000000000 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.driver; - -import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.javatuples.Pair; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A queue of incoming {@link Result} objects. The queue is updated by the {@link GremlinResponseHandler} - * until a response terminator is identified. - * - * @author Stephen Mallette (http://stephen.genoprime.com) - */ -@SuppressWarnings("ThrowableResultOfMethodCallIgnored") -public final class ResultQueue { - - private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue; - - private final AtomicReference<Throwable> error = new AtomicReference<>(); - - private final CompletableFuture<Void> readComplete; - - private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>(); - - public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) { - this.resultLinkedBlockingQueue = resultLinkedBlockingQueue; - this.readComplete = readComplete; - } - - /** - * Adds a {@link Result} to the queue which will be later read by the {@link ResultSet}. - * - * @param result a return value from the {@link Traversal} or script submitted for execution - */ - public void add(final Result result) { - this.resultLinkedBlockingQueue.offer(result); - tryDrainNextWaiting(false); - } - - public CompletableFuture<List<Result>> await(final int items) { - final CompletableFuture<List<Result>> result = new CompletableFuture<>(); - waiting.add(Pair.with(result, items)); - - tryDrainNextWaiting(false); - - return result; - } - - public int size() { - if (error.get() != null) throw new RuntimeException(error.get()); - return this.resultLinkedBlockingQueue.size(); - } - - public boolean isEmpty() { - if (error.get() != null) throw new RuntimeException(error.get()); - return this.size() == 0; - } - - public boolean isComplete() { - return readComplete.isDone(); - } - - void drainTo(final Collection<Result> collection) { - if (error.get() != null) throw new RuntimeException(error.get()); - resultLinkedBlockingQueue.drainTo(collection); - } - - public void markComplete() { - this.readComplete.complete(null); - - this.drainAllWaiting(); - } - - public void markError(final Throwable throwable) { - error.set(throwable); - this.readComplete.completeExceptionally(throwable); - this.drainAllWaiting(); - } - - /** - * Completes the next waiting future if there is one. - */ - private synchronized void tryDrainNextWaiting(final boolean force) { - // need to peek because the number of available items needs to be >= the expected size for that future. if not - // it needs to keep waiting - final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek(); - if (nextWaiting != null && (force || (resultLinkedBlockingQueue.size() >= nextWaiting.getValue1() || readComplete.isDone()))) { - final int items = nextWaiting.getValue1(); - final CompletableFuture<List<Result>> future = nextWaiting.getValue0(); - final List<Result> results = new ArrayList<>(items); - resultLinkedBlockingQueue.drainTo(results, items); - - // it's important to check for error here because a future may have already been queued in "waiting" prior - // to the first response back from the server. if that happens, any "waiting" futures should be completed - // exceptionally otherwise it will look like success. - if (null == error.get()) - future.complete(results); - else - future.completeExceptionally(error.get()); - - waiting.remove(nextWaiting); - } - } - - /** - * Completes all remaining futures. - */ - private void drainAllWaiting() { - while (!waiting.isEmpty()) { - tryDrainNextWaiting(true); - } - } -} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java index 0656586706..b5d506bc1f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java @@ -19,22 +19,28 @@ package org.apache.tinkerpop.gremlin.driver; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.javatuples.Pair; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Queue; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import java.util.stream.StreamSupport; /** * A {@code ResultSet} is returned from the submission of a Gremlin script to the server and represents the * results provided by the server. The results from the server are streamed into the {@code ResultSet} and - * therefore may not be available immediately. As such, {@code ResultSet} provides access to a a number + * therefore may not be available immediately. As such, {@code ResultSet} provides access to a number * of functions that help to work with the asynchronous nature of the data streaming back. Data from results * is stored in an {@link Result} which can be used to retrieve the item once it is on the client side. * <p/> @@ -48,20 +54,18 @@ import java.util.stream.StreamSupport; * @author Stephen Mallette (http://stephen.genoprime.com) */ public final class ResultSet implements Iterable<Result> { - private final ResultQueue resultQueue; + private final LinkedBlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>(); + private final Queue<Pair<CompletableFuture<List<Result>>, Integer>> waiting = new ConcurrentLinkedQueue<>(); + private final AtomicReference<Throwable> error = new AtomicReference<>(); + private final CompletableFuture<Void> readCompleted = new CompletableFuture<>(); + private final ExecutorService executor; private final RequestMessage originalRequestMessage; private final Host host; - private final CompletableFuture<Void> readCompleted; - - public ResultSet(final ResultQueue resultQueue, final ExecutorService executor, - final CompletableFuture<Void> readCompleted, final RequestMessage originalRequestMessage, - final Host host) { + public ResultSet(final ExecutorService executor, final RequestMessage originalRequestMessage, final Host host) { this.executor = executor; this.host = host; - this.resultQueue = resultQueue; - this.readCompleted = readCompleted; this.originalRequestMessage = originalRequestMessage; } @@ -94,6 +98,7 @@ public final class ResultSet implements Iterable<Result> { * Gets the number of items available on the client. */ public int getAvailableItemCount() { + if (error.get() != null) throw new RuntimeException(error.get()); return resultQueue.size(); } @@ -114,7 +119,10 @@ public final class ResultSet implements Iterable<Result> { * completed and there are less than that number specified available. */ public CompletableFuture<List<Result>> some(final int items) { - return resultQueue.await(items); + final CompletableFuture<List<Result>> result = new CompletableFuture<>(); + waiting.add(Pair.with(result, items)); + tryDrainNextWaiting(false); + return result; } /** @@ -125,6 +133,7 @@ public final class ResultSet implements Iterable<Result> { */ public CompletableFuture<List<Result>> all() { return readCompleted.thenApplyAsync(unusedInput -> { + if (error.get() != null) throw new RuntimeException(error.get()); final List<Result> list = new ArrayList<>(); resultQueue.drainTo(list); return list; @@ -174,4 +183,95 @@ public final class ResultSet implements Iterable<Result> { } }; } + + // ==================== Methods called by Handlers ==================== + + /** + * Adds a {@link Result} to the queue which will be later read by consumers. + * + * @param result a return value from the traversal or script submitted for execution + */ + public void add(final Result result) { + this.resultQueue.offer(result); + tryDrainNextWaiting(false); + } + + /** + * Marks the result stream as complete. + */ + public void markComplete() { + this.readCompleted.complete(null); + this.drainAllWaiting(); + } + + /** + * Marks the result stream as failed with an error. + * + * @param throwable the error that occurred + */ + public void markError(final Throwable throwable) { + error.set(throwable); + this.readCompleted.completeExceptionally(throwable); + this.drainAllWaiting(); + } + + /** + * Returns the future that completes when reading is done. Used internally + * for connection lifecycle management. + */ + CompletableFuture<Void> getReadCompleted() { + return readCompleted; + } + + // ==================== Internal queue management ==================== + + /** + * Checks if the queue is empty. + */ + boolean isEmpty() { + if (error.get() != null) throw new RuntimeException(error.get()); + return resultQueue.isEmpty(); + } + + /** + * Drains results to the provided collection. + */ + void drainTo(final Collection<Result> collection) { + if (error.get() != null) throw new RuntimeException(error.get()); + resultQueue.drainTo(collection); + } + + /** + * Completes the next waiting future if there is one and enough results are available. + */ + private synchronized void tryDrainNextWaiting(final boolean force) { + // need to peek because the number of available items needs to be >= the expected size for that future. if not + // it needs to keep waiting + final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek(); + if (nextWaiting != null && (force || (resultQueue.size() >= nextWaiting.getValue1() || readCompleted.isDone()))) { + final int items = nextWaiting.getValue1(); + final CompletableFuture<List<Result>> future = nextWaiting.getValue0(); + final List<Result> results = new ArrayList<>(items); + resultQueue.drainTo(results, items); + + // it's important to check for error here because a future may have already been queued in "waiting" prior + // to the first response back from the server. if that happens, any "waiting" futures should be completed + // exceptionally otherwise it will look like success. + if (null == error.get()) + future.complete(results); + else + future.completeExceptionally(error.get()); + + waiting.remove(nextWaiting); + } + } + + /** + * Completes all remaining futures. + */ + private void drainAllWaiting() { + while (!waiting.isEmpty()) { + tryDrainNextWaiting(true); + } + } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index a94f17bdb2..d4cfb167d2 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -25,7 +25,7 @@ import io.netty.util.AttributeKey; import javax.net.ssl.SSLException; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.tinkerpop.gremlin.driver.Result; -import org.apache.tinkerpop.gremlin.driver.ResultQueue; +import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; import org.apache.tinkerpop.gremlin.util.ExceptionHelper; @@ -42,17 +42,16 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; /** - * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request - * as the {@link ResponseMessage} objects are deserialized. + * Writes responses to the {@link ResultSet} of a request as the {@link ResponseMessage} objects are deserialized. */ public class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> { public static final AttributeKey<Throwable> INBOUND_SSL_EXCEPTION = AttributeKey.valueOf("inboundSslException"); private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException"); - private final AtomicReference<ResultQueue> pending; + private final AtomicReference<ResultSet> pendingResultSet; - public GremlinResponseHandler(final AtomicReference<ResultQueue> pending) { - this.pending = pending; + public GremlinResponseHandler(final AtomicReference<ResultSet> pending) { + this.pendingResultSet = pending; } @Override @@ -61,7 +60,7 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // should fire off a close message which will properly release the driver. super.channelInactive(ctx); - final ResultQueue current = pending.getAndSet(null); + final ResultSet current = pendingResultSet.getAndSet(null); if (current != null) { current.markError(new IllegalStateException("Connection to server is no longer active")); } @@ -70,28 +69,28 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response @Override protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) { final HttpResponseStatus statusCode = response.getStatus() == null ? null : response.getStatus().getCode(); - final ResultQueue queue = pending.get(); + final ResultSet resultSet = pendingResultSet.get(); if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) { final List<Object> data = response.getResult().getData(); final boolean bulked = channelHandlerContext.channel().attr(HttpGremlinResponseStreamDecoder.IS_BULKED).get(); - // unrolls the collection into individual results to be handled by the queue. + // unrolls the collection into individual results to be handled by the ResultSet. if (bulked) { for (Iterator<Object> iter = data.iterator(); iter.hasNext(); ) { final Object obj = iter.next(); final long bulk = (long) iter.next(); DefaultRemoteTraverser<Object> item = new DefaultRemoteTraverser<>(obj, bulk); - queue.add(new Result(item)); + resultSet.add(new Result(item)); } } else { - data.forEach(item -> queue.add(new Result(item))); + data.forEach(item -> resultSet.add(new Result(item))); } } else { // this is a "success" but represents no results otherwise it is an error if (statusCode != HttpResponseStatus.NO_CONTENT) { // Save the error because there could be a subsequent HttpContent coming (probably just trailers). All - // content should be read first before marking the queue or else this channel might get reused too early. + // content should be read first before marking the ResultSet or else this channel might get reused too early. channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).set( new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(), response.getStatus().getException()) @@ -101,12 +100,12 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // Stream is done when the last content signaling response message is read. if (LAST_CONTENT_READ_RESPONSE == response) { - final ResultQueue resultQueue = pending.getAndSet(null); - if (resultQueue != null) { + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { - resultQueue.markComplete(); + rs.markComplete(); } else { - resultQueue.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); } } } @@ -119,8 +118,8 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // there are that many failures someone would take notice and hopefully stop the client. logger.error("Could not process the response", cause); - final ResultQueue pendingQueue = pending.getAndSet(null); - if (pendingQueue != null) pendingQueue.markError(cause); + final ResultSet resultSet = this.pendingResultSet.getAndSet(null); + if (resultSet != null) resultSet.markError(cause); if (ExceptionHelper.getRootCause(cause) instanceof SSLException) { // inbound ssl error can happen with tls 1.3 because client certification auth can fail after the handshake completes diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java similarity index 78% rename from gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java rename to gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java index 33b9287999..c2054f2761 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java @@ -18,31 +18,26 @@ */ package org.apache.tinkerpop.gremlin.driver; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.junit.After; import org.junit.Before; -import java.util.Collections; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @author Stephen Mallette (http://stephen.genoprime.com) */ -public abstract class AbstractResultQueueTest { +public abstract class AbstractResultSetTest { protected ExecutorService pool; - protected ResultQueue resultQueue; - protected CompletableFuture<Void> readCompleted; + protected ResultSet resultSet; @Before public void setup() { - final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>(); - readCompleted = new CompletableFuture<>(); - resultQueue = new ResultQueue(resultLinkedBlockingQueue, readCompleted); - pool = Executors.newCachedThreadPool(); + pool = Executors.newCachedThreadPool(); + resultSet = new ResultSet(pool, RequestMessage.build("traversal").create(), null); } @After @@ -63,15 +58,15 @@ public abstract class AbstractResultQueueTest { } /** - * Adds some test items to the {@link ResultQueue}. This method has the potential to block if the + * Adds some test items to the {@link ResultSet}. This method has the potential to block if the * {@code itemsToWaitFor > 0}. * * @param numberOfItemsToAdd the number of items to add in total - * @param pauseBetweenItemsInMillis amount of time to wait between additions to the {@link ResultQueue} + * @param pauseBetweenItemsInMillis amount of time to wait between additions to the {@link ResultSet} * @param start set to {@code true} to start the thread - * @param markDone force mark the queue as complete - * @param itemsToWaitFor block until this many items have been added to the {@link ResultQueue} - * @return the thread that is doing the work of adding to the queue + * @param markDone force mark the result set as complete + * @param itemsToWaitFor block until this many items have been added to the {@link ResultSet} + * @return the thread that is doing the work of adding to the result set * @throws Exception */ protected Thread addToQueue(final int numberOfItemsToAdd, final long pauseBetweenItemsInMillis, @@ -85,7 +80,7 @@ public abstract class AbstractResultQueueTest { final int currentIndex = ix; pool.submit(() -> { final Result result = new Result("test-" + currentIndex); - resultQueue.add(result); + resultSet.add(result); latch.countDown(); }).get(); TimeUnit.MILLISECONDS.sleep(pauseBetweenItemsInMillis); @@ -94,13 +89,13 @@ public abstract class AbstractResultQueueTest { } } - if (markDone) resultQueue.markComplete(); + if (markDone) resultSet.markComplete(); - }, "ResultQueueTest-job-submitter"); + }, "ResultSetTest-job-submitter"); if (start) t.start(); - // wait for the number of items requested to be added to the queue + // wait for the number of items requested to be added to the result set latch.await(); return t; diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java similarity index 67% rename from gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java rename to gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java index f540ddc123..4bc0f5b5b8 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java @@ -22,9 +22,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.Test; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,23 +35,25 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** + * Tests for the internal queuing functionality of {@link ResultSet}. + * * @author Stephen Mallette (http://stephen.genoprime.com) */ -public class ResultQueueTest extends AbstractResultQueueTest { +public class ResultSetQueuingTest extends AbstractResultSetTest { @Test public void shouldGetSizeUntilError() throws Exception { final Thread t = addToQueue(100, 10, true, false, 1); try { - assertThat(resultQueue.size(), is(greaterThan(0))); - assertThat(readCompleted.isDone(), is(false)); + assertThat(resultSet.getAvailableItemCount(), is(greaterThan(0))); + assertThat(resultSet.allItemsAvailable(), is(false)); final Exception theProblem = new Exception(); - resultQueue.markError(theProblem); - assertThat(readCompleted.isDone(), is(true)); + resultSet.markError(theProblem); + assertThat(resultSet.allItemsAvailable(), is(true)); try { - resultQueue.size(); + resultSet.getAvailableItemCount(); fail("Should have thrown an exception"); } catch (Exception ex) { assertEquals(theProblem, ex.getCause()); @@ -65,24 +65,24 @@ public class ResultQueueTest extends AbstractResultQueueTest { @Test public void shouldBeEmptyThenNotEmpty() { - assertThat(resultQueue.isEmpty(), is(true)); - resultQueue.add(new Result("test")); - assertThat(resultQueue.isEmpty(), is(false)); + assertThat(resultSet.isEmpty(), is(true)); + resultSet.add(new Result("test")); + assertThat(resultSet.isEmpty(), is(false)); } @Test public void shouldNotBeEmptyUntilError() throws Exception { final Thread t = addToQueue(100, 10, true, false, 1); try { - assertThat(resultQueue.isEmpty(), is(false)); - assertThat(readCompleted.isDone(), is(false)); + assertThat(resultSet.isEmpty(), is(false)); + assertThat(resultSet.allItemsAvailable(), is(false)); final Exception theProblem = new Exception(); - resultQueue.markError(theProblem); - assertThat(readCompleted.isDone(), is(true)); + resultSet.markError(theProblem); + assertThat(resultSet.allItemsAvailable(), is(true)); try { - resultQueue.isEmpty(); + resultSet.isEmpty(); fail("Should have thrown an exception"); } catch (Exception ex) { assertEquals(theProblem, ex.getCause()); @@ -96,24 +96,24 @@ public class ResultQueueTest extends AbstractResultQueueTest { public void shouldDrainUntilError() throws Exception { final Thread t = addToQueue(100, 10, true, false, 1); try { - assertThat(resultQueue.isEmpty(), is(false)); + assertThat(resultSet.isEmpty(), is(false)); final List<Result> drain = new ArrayList<>(); - resultQueue.drainTo(drain); + resultSet.drainTo(drain); assertThat(drain.size(), is(greaterThan(0))); - assertThat(readCompleted.isDone(), is(false)); + assertThat(resultSet.allItemsAvailable(), is(false)); - // make sure some more items get added to the queue before assert + // make sure some more items get added to the result set before assert TimeUnit.MILLISECONDS.sleep(100); - assertThat(resultQueue.isEmpty(), is(false)); - assertThat(readCompleted.isDone(), is(false)); + assertThat(resultSet.isEmpty(), is(false)); + assertThat(resultSet.allItemsAvailable(), is(false)); final Exception theProblem = new Exception(); - resultQueue.markError(theProblem); - assertThat(readCompleted.isDone(), is(true)); + resultSet.markError(theProblem); + assertThat(resultSet.allItemsAvailable(), is(true)); try { - resultQueue.drainTo(new ArrayList<>()); + resultSet.drainTo(new ArrayList<>()); fail("Should have thrown an exception"); } catch (Exception ex) { assertEquals(theProblem, ex.getCause()); @@ -125,13 +125,13 @@ public class ResultQueueTest extends AbstractResultQueueTest { @Test public void shouldAwaitEverythingAndFlushOnMarkCompleted() throws Exception { - final CompletableFuture<List<Result>> future = resultQueue.await(4); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + final CompletableFuture<List<Result>> future = resultSet.some(4); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); assertThat(future.isDone(), is(false)); - resultQueue.markComplete(); + resultSet.markComplete(); assertThat(future.isDone(), is(true)); final List<Result> results = future.get(); @@ -140,18 +140,18 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test3", results.get(2).getString()); assertEquals(3, results.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test public void shouldAwaitFailTheFutureOnMarkError() { - final CompletableFuture<List<Result>> future = resultQueue.await(4); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + final CompletableFuture<List<Result>> future = resultSet.some(4); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); assertThat(future.isDone(), is(false)); - resultQueue.markError(new Exception("no worky")); + resultSet.markError(new Exception("no worky")); assertThat(future.isDone(), is(true)); try { @@ -164,14 +164,14 @@ public class ResultQueueTest extends AbstractResultQueueTest { @Test public void shouldAwaitToExpectedValueAndDrainOnAdd() throws Exception { - final CompletableFuture<List<Result>> future = resultQueue.await(3); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); + final CompletableFuture<List<Result>> future = resultSet.some(3); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); // shouldn't complete until the third item is in play assertThat(future.isDone(), is(false)); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test3")); final List<Result> results = future.get(); assertEquals("test1", results.get(0).getString()); @@ -179,21 +179,21 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test3", results.get(2).getString()); assertEquals(3, results.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test public void shouldAwaitMultipleToExpectedValueAndDrainOnAdd() throws Exception { - final CompletableFuture<List<Result>> future1 = resultQueue.await(3); - final CompletableFuture<List<Result>> future2 = resultQueue.await(1); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); + final CompletableFuture<List<Result>> future1 = resultSet.some(3); + final CompletableFuture<List<Result>> future2 = resultSet.some(1); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); // shouldn't complete the first future until the third item is in play assertThat(future1.isDone(), is(false)); assertThat(future2.isDone(), is(false)); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test3")); final List<Result> results1 = future1.get(); assertEquals("test1", results1.get(0).getString()); @@ -203,7 +203,7 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertThat(future1.isDone(), is(true)); assertThat(future2.isDone(), is(false)); - resultQueue.add(new Result("test4")); + resultSet.add(new Result("test4")); assertThat(future1.isDone(), is(true)); assertThat(future2.isDone(), is(true)); @@ -211,16 +211,16 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test4", results2.get(0).getString()); assertEquals(1, results2.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test public void shouldAwaitToExpectedValueAndDrainOnAwait() throws Exception { - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); - final CompletableFuture<List<Result>> future = resultQueue.await(3); + final CompletableFuture<List<Result>> future = resultSet.some(3); assertThat(future.isDone(), is(true)); final List<Result> results = future.get(); @@ -229,19 +229,19 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test3", results.get(2).getString()); assertEquals(3, results.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test public void shouldAwaitToReadCompletedAndDrainOnAwait() throws Exception { - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); - resultQueue.markComplete(); + resultSet.markComplete(); // you might want 30 but there are only three - final CompletableFuture<List<Result>> future = resultQueue.await(30); + final CompletableFuture<List<Result>> future = resultSet.some(30); assertThat(future.isDone(), is(true)); final List<Result> results = future.get(); @@ -250,7 +250,7 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test3", results.get(2).getString()); assertEquals(3, results.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test @@ -262,17 +262,17 @@ public class ResultQueueTest extends AbstractResultQueueTest { final AtomicInteger count3 = new AtomicInteger(0); final CountDownLatch latch = new CountDownLatch(3); - resultQueue.await(500).thenAcceptAsync(r -> { + resultSet.some(500).thenAcceptAsync(r -> { count1.set(r.size()); latch.countDown(); }); - resultQueue.await(150).thenAcceptAsync(r -> { + resultSet.some(150).thenAcceptAsync(r -> { count2.set(r.size()); latch.countDown(); }); - resultQueue.await(350).thenAcceptAsync(r -> { + resultSet.some(350).thenAcceptAsync(r -> { count3.set(r.size()); latch.countDown(); }); @@ -283,7 +283,7 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals(150, count2.get()); assertEquals(350, count3.get()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } finally { t.interrupt(); } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java index 855d9466a9..4bf3b8c6b7 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java @@ -18,8 +18,6 @@ */ package org.apache.tinkerpop.gremlin.driver; -import org.apache.tinkerpop.gremlin.util.message.RequestMessage; -import org.junit.Before; import org.junit.Test; import java.util.Iterator; @@ -39,20 +37,13 @@ import static org.junit.Assert.fail; /** * @author Stephen Mallette (http://stephen.genoprime.com) */ -public class ResultSetTest extends AbstractResultQueueTest { - - private ResultSet resultSet; - - @Before - public void setupThis() { - resultSet = new ResultSet(resultQueue, pool, readCompleted, RequestMessage.build("traversal").create(), null); - } +public class ResultSetTest extends AbstractResultSetTest { @Test public void shouldHaveAllItemsAvailableAsynchronouslyOnReadComplete() throws InterruptedException { final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync(); assertThat(all.isDone(), is(false)); - readCompleted.complete(null); + resultSet.markComplete(); // flush all tasks in pool pool.awaitTermination(2, TimeUnit.SECONDS); assertThat(all.isDone(), is(true)); @@ -62,7 +53,7 @@ public class ResultSetTest extends AbstractResultQueueTest { public void shouldHaveAllItemsAvailableAsynchronouslyOnReadCompleteExceptionally() throws InterruptedException { final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync(); assertThat(all.isDone(), is(false)); - readCompleted.completeExceptionally(new RuntimeException()); + resultSet.markError(new RuntimeException()); // flush all tasks in pool pool.awaitTermination(2, TimeUnit.SECONDS); assertThat(all.isDone(), is(true)); @@ -72,7 +63,7 @@ public class ResultSetTest extends AbstractResultQueueTest { @Test public void shouldHaveAllItemsAvailableOnReadComplete() throws InterruptedException { assertThat(resultSet.allItemsAvailable(), is(false)); - readCompleted.complete(null); + resultSet.markComplete(); // flush all tasks in pool pool.awaitTermination(2, TimeUnit.SECONDS); assertThat(resultSet.allItemsAvailable(), is(true)); @@ -85,7 +76,7 @@ public class ResultSetTest extends AbstractResultQueueTest { final AtomicBoolean atLeastOnce = new AtomicBoolean(false); addToQueue(1000, 1, true, true); - while (!readCompleted.isDone()) { + while (!resultSet.allItemsAvailable()) { atLeastOnce.set(true); if (!atLeastOnce.get()) assertThat(all.isDone(), is(false)); @@ -104,7 +95,7 @@ public class ResultSetTest extends AbstractResultQueueTest { final AtomicBoolean atLeastOnce = new AtomicBoolean(false); addToQueue(1000, 1, true, true); - while (!readCompleted.isDone()) { + while (!resultSet.allItemsAvailable()) { atLeastOnce.set(true); if (!atLeastOnce.get()) assertThat(resultSet.allItemsAvailable(), is(false)); @@ -117,12 +108,12 @@ public class ResultSetTest extends AbstractResultQueueTest { @Test public void shouldAwaitEverythingAndFlushOnMarkCompleted() throws Exception { final CompletableFuture<List<Result>> future = resultSet.some(4); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); assertThat(future.isDone(), is(false)); - resultQueue.markComplete(); + resultSet.markComplete(); assertThat(future.isDone(), is(true)); final List<Result> results = future.get(); @@ -138,12 +129,12 @@ public class ResultSetTest extends AbstractResultQueueTest { @Test public void shouldGetAllOnlyOnComplete() throws Exception { final CompletableFuture<List<Result>> future = resultSet.all(); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); assertThat(future.isDone(), is(false)); - resultQueue.markComplete(); + resultSet.markComplete(); final List<Result> results = future.get(); assertEquals("test1", results.get(0).getString());
