This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master-tx-client in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit b225dac4d90e396f26f9d06909db2d91b562c65b Author: Ken Hu <[email protected]> AuthorDate: Wed Feb 25 15:40:05 2026 -0800 Merge ResultQueue into ResultSet to simplify result handling The ResultQueue class was an unnecessary indirection layer between Netty and the ResultSet. This was probably more useful when the protocol was WebSocket-based and there were multiple requests on the same connection but it doesn't seem to have much of a purpose anymore. This new setup also potentially works better for HTTP/2 if the high level, stream-based API is used. If the frame-based API is used then changes might be required in the future. Some tests were deleted as they aren't actually relevant anymore because of HTTP and this refactoring. --- .../tinkerpop/gremlin/driver/Channelizer.java | 2 +- .../tinkerpop/gremlin/driver/Connection.java | 29 ++--- .../tinkerpop/gremlin/driver/ResultQueue.java | 139 --------------------- .../apache/tinkerpop/gremlin/driver/ResultSet.java | 120 ++++++++++++++++-- .../driver/handler/GremlinResponseHandler.java | 35 +++--- ...ltQueueTest.java => AbstractResultSetTest.java} | 33 +++-- .../tinkerpop/gremlin/driver/ClusterTest.java | 117 +++++++++++++++++ .../gremlin/driver/ConnectionPoolTest.java | 4 +- ...ultQueueTest.java => ResultSetQueuingTest.java} | 126 +++++++++---------- .../tinkerpop/gremlin/driver/ResultSetTest.java | 37 +++--- .../driver/ClientConnectionIntegrateTest.java | 67 ++-------- .../gremlin/server/GremlinDriverIntegrateTest.java | 4 - .../gremlin/server/HttpDriverIntegrateTest.java | 16 --- 13 files changed, 358 insertions(+), 371 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index fd7d585baf..427f76c056 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -92,7 +92,7 @@ public interface Channelizer extends ChannelHandler { protected Connection connection; protected Cluster cluster; protected SslHandler sslHandler; - private AtomicReference<ResultQueue> pending; + private AtomicReference<ResultSet> pending; protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler"; protected static final String PIPELINE_SSL_HANDLER = "gremlin-ssl-handler"; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 5f9c1325d3..cb94b39b87 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -32,7 +32,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import java.net.URI; import java.time.Instant; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -56,7 +55,7 @@ final class Connection { private final Channel channel; private final URI uri; - private final AtomicReference<ResultQueue> pending = new AtomicReference<>(); + private final AtomicReference<ResultSet> pending = new AtomicReference<>(); private final Cluster cluster; private final Client client; private final ConnectionPool pool; @@ -155,7 +154,7 @@ final class Connection { return cluster; } - AtomicReference<ResultQueue> getPending() { + AtomicReference<ResultSet> getPending() { return pending; } @@ -184,9 +183,9 @@ final class Connection { return future; } - public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) { - // once there is a completed write, then create a traverser for the result set and complete - // the promise so that the client knows that that it can start checking for results. + public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultSetFuture) { + // once there is a completed write, then create a ResultSet and complete + // the promise so that the client knows that it can start checking for results. final Connection thisConnection = this; final ChannelPromise requestPromise = channel.newPromise() @@ -198,12 +197,11 @@ final class Connection { handleConnectionCleanupOnError(thisConnection); - cluster.executor().submit(() -> resultQueueSetup.completeExceptionally(f.cause())); + cluster.executor().submit(() -> resultSetFuture.completeExceptionally(f.cause())); } else { - final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>(); - final CompletableFuture<Void> readCompleted = new CompletableFuture<>(); + final ResultSet resultSet = new ResultSet(cluster.executor(), requestMessage, pool.host); - readCompleted.whenCompleteAsync((v, t) -> { + resultSet.getReadCompleted().whenCompleteAsync((v, t) -> { if (t != null) { // the callback for when the read failed. a failed read means the request went to the server // and came back with a server-side error of some sort. it means the server is responsive @@ -212,7 +210,7 @@ final class Connection { logger.debug("Error while processing request on the server {}.", this, t); handleConnectionCleanupOnError(thisConnection); } else { - // the callback for when the read was successful, meaning that ResultQueue.markComplete() + // the callback for when the read was successful, meaning that ResultSet.markComplete() // was called thisConnection.returnToPool(); } @@ -223,15 +221,12 @@ final class Connection { tryShutdown(); }, cluster.executor()); - final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted); - // pending.put(requestMessage.getRequestId(), handler); - pending.set(handler); + pending.set(resultSet); - // resultQueueSetup should only be completed by a worker since the application code might have sync + // resultSetFuture should only be completed by a worker since the application code might have sync // completion stages attached to it which and we do not want the event loop threads to process those // stages. - cluster.executor().submit(() -> resultQueueSetup.complete( - new ResultSet(handler, cluster.executor(), readCompleted, requestMessage, pool.host))); + cluster.executor().submit(() -> resultSetFuture.complete(resultSet)); } }); channel.writeAndFlush(requestMessage, requestPromise); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java deleted file mode 100644 index e91e833b94..0000000000 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.driver; - -import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.javatuples.Pair; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A queue of incoming {@link Result} objects. The queue is updated by the {@link GremlinResponseHandler} - * until a response terminator is identified. - * - * @author Stephen Mallette (http://stephen.genoprime.com) - */ -@SuppressWarnings("ThrowableResultOfMethodCallIgnored") -public final class ResultQueue { - - private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue; - - private final AtomicReference<Throwable> error = new AtomicReference<>(); - - private final CompletableFuture<Void> readComplete; - - private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting = new ConcurrentLinkedQueue<>(); - - public ResultQueue(final LinkedBlockingQueue<Result> resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) { - this.resultLinkedBlockingQueue = resultLinkedBlockingQueue; - this.readComplete = readComplete; - } - - /** - * Adds a {@link Result} to the queue which will be later read by the {@link ResultSet}. - * - * @param result a return value from the {@link Traversal} or script submitted for execution - */ - public void add(final Result result) { - this.resultLinkedBlockingQueue.offer(result); - tryDrainNextWaiting(false); - } - - public CompletableFuture<List<Result>> await(final int items) { - final CompletableFuture<List<Result>> result = new CompletableFuture<>(); - waiting.add(Pair.with(result, items)); - - tryDrainNextWaiting(false); - - return result; - } - - public int size() { - if (error.get() != null) throw new RuntimeException(error.get()); - return this.resultLinkedBlockingQueue.size(); - } - - public boolean isEmpty() { - if (error.get() != null) throw new RuntimeException(error.get()); - return this.size() == 0; - } - - public boolean isComplete() { - return readComplete.isDone(); - } - - void drainTo(final Collection<Result> collection) { - if (error.get() != null) throw new RuntimeException(error.get()); - resultLinkedBlockingQueue.drainTo(collection); - } - - public void markComplete() { - this.readComplete.complete(null); - - this.drainAllWaiting(); - } - - public void markError(final Throwable throwable) { - error.set(throwable); - this.readComplete.completeExceptionally(throwable); - this.drainAllWaiting(); - } - - /** - * Completes the next waiting future if there is one. - */ - private synchronized void tryDrainNextWaiting(final boolean force) { - // need to peek because the number of available items needs to be >= the expected size for that future. if not - // it needs to keep waiting - final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek(); - if (nextWaiting != null && (force || (resultLinkedBlockingQueue.size() >= nextWaiting.getValue1() || readComplete.isDone()))) { - final int items = nextWaiting.getValue1(); - final CompletableFuture<List<Result>> future = nextWaiting.getValue0(); - final List<Result> results = new ArrayList<>(items); - resultLinkedBlockingQueue.drainTo(results, items); - - // it's important to check for error here because a future may have already been queued in "waiting" prior - // to the first response back from the server. if that happens, any "waiting" futures should be completed - // exceptionally otherwise it will look like success. - if (null == error.get()) - future.complete(results); - else - future.completeExceptionally(error.get()); - - waiting.remove(nextWaiting); - } - } - - /** - * Completes all remaining futures. - */ - private void drainAllWaiting() { - while (!waiting.isEmpty()) { - tryDrainNextWaiting(true); - } - } -} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java index 0656586706..b5d506bc1f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java @@ -19,22 +19,28 @@ package org.apache.tinkerpop.gremlin.driver; import org.apache.tinkerpop.gremlin.util.message.RequestMessage; +import org.javatuples.Pair; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Queue; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import java.util.stream.StreamSupport; /** * A {@code ResultSet} is returned from the submission of a Gremlin script to the server and represents the * results provided by the server. The results from the server are streamed into the {@code ResultSet} and - * therefore may not be available immediately. As such, {@code ResultSet} provides access to a a number + * therefore may not be available immediately. As such, {@code ResultSet} provides access to a number * of functions that help to work with the asynchronous nature of the data streaming back. Data from results * is stored in an {@link Result} which can be used to retrieve the item once it is on the client side. * <p/> @@ -48,20 +54,18 @@ import java.util.stream.StreamSupport; * @author Stephen Mallette (http://stephen.genoprime.com) */ public final class ResultSet implements Iterable<Result> { - private final ResultQueue resultQueue; + private final LinkedBlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>(); + private final Queue<Pair<CompletableFuture<List<Result>>, Integer>> waiting = new ConcurrentLinkedQueue<>(); + private final AtomicReference<Throwable> error = new AtomicReference<>(); + private final CompletableFuture<Void> readCompleted = new CompletableFuture<>(); + private final ExecutorService executor; private final RequestMessage originalRequestMessage; private final Host host; - private final CompletableFuture<Void> readCompleted; - - public ResultSet(final ResultQueue resultQueue, final ExecutorService executor, - final CompletableFuture<Void> readCompleted, final RequestMessage originalRequestMessage, - final Host host) { + public ResultSet(final ExecutorService executor, final RequestMessage originalRequestMessage, final Host host) { this.executor = executor; this.host = host; - this.resultQueue = resultQueue; - this.readCompleted = readCompleted; this.originalRequestMessage = originalRequestMessage; } @@ -94,6 +98,7 @@ public final class ResultSet implements Iterable<Result> { * Gets the number of items available on the client. */ public int getAvailableItemCount() { + if (error.get() != null) throw new RuntimeException(error.get()); return resultQueue.size(); } @@ -114,7 +119,10 @@ public final class ResultSet implements Iterable<Result> { * completed and there are less than that number specified available. */ public CompletableFuture<List<Result>> some(final int items) { - return resultQueue.await(items); + final CompletableFuture<List<Result>> result = new CompletableFuture<>(); + waiting.add(Pair.with(result, items)); + tryDrainNextWaiting(false); + return result; } /** @@ -125,6 +133,7 @@ public final class ResultSet implements Iterable<Result> { */ public CompletableFuture<List<Result>> all() { return readCompleted.thenApplyAsync(unusedInput -> { + if (error.get() != null) throw new RuntimeException(error.get()); final List<Result> list = new ArrayList<>(); resultQueue.drainTo(list); return list; @@ -174,4 +183,95 @@ public final class ResultSet implements Iterable<Result> { } }; } + + // ==================== Methods called by Handlers ==================== + + /** + * Adds a {@link Result} to the queue which will be later read by consumers. + * + * @param result a return value from the traversal or script submitted for execution + */ + public void add(final Result result) { + this.resultQueue.offer(result); + tryDrainNextWaiting(false); + } + + /** + * Marks the result stream as complete. + */ + public void markComplete() { + this.readCompleted.complete(null); + this.drainAllWaiting(); + } + + /** + * Marks the result stream as failed with an error. + * + * @param throwable the error that occurred + */ + public void markError(final Throwable throwable) { + error.set(throwable); + this.readCompleted.completeExceptionally(throwable); + this.drainAllWaiting(); + } + + /** + * Returns the future that completes when reading is done. Used internally + * for connection lifecycle management. + */ + CompletableFuture<Void> getReadCompleted() { + return readCompleted; + } + + // ==================== Internal queue management ==================== + + /** + * Checks if the queue is empty. + */ + boolean isEmpty() { + if (error.get() != null) throw new RuntimeException(error.get()); + return resultQueue.isEmpty(); + } + + /** + * Drains results to the provided collection. + */ + void drainTo(final Collection<Result> collection) { + if (error.get() != null) throw new RuntimeException(error.get()); + resultQueue.drainTo(collection); + } + + /** + * Completes the next waiting future if there is one and enough results are available. + */ + private synchronized void tryDrainNextWaiting(final boolean force) { + // need to peek because the number of available items needs to be >= the expected size for that future. if not + // it needs to keep waiting + final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = waiting.peek(); + if (nextWaiting != null && (force || (resultQueue.size() >= nextWaiting.getValue1() || readCompleted.isDone()))) { + final int items = nextWaiting.getValue1(); + final CompletableFuture<List<Result>> future = nextWaiting.getValue0(); + final List<Result> results = new ArrayList<>(items); + resultQueue.drainTo(results, items); + + // it's important to check for error here because a future may have already been queued in "waiting" prior + // to the first response back from the server. if that happens, any "waiting" futures should be completed + // exceptionally otherwise it will look like success. + if (null == error.get()) + future.complete(results); + else + future.completeExceptionally(error.get()); + + waiting.remove(nextWaiting); + } + } + + /** + * Completes all remaining futures. + */ + private void drainAllWaiting() { + while (!waiting.isEmpty()) { + tryDrainNextWaiting(true); + } + } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index a94f17bdb2..d4cfb167d2 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -25,7 +25,7 @@ import io.netty.util.AttributeKey; import javax.net.ssl.SSLException; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.tinkerpop.gremlin.driver.Result; -import org.apache.tinkerpop.gremlin.driver.ResultQueue; +import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; import org.apache.tinkerpop.gremlin.util.ExceptionHelper; @@ -42,17 +42,16 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; /** - * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request - * as the {@link ResponseMessage} objects are deserialized. + * Writes responses to the {@link ResultSet} of a request as the {@link ResponseMessage} objects are deserialized. */ public class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> { public static final AttributeKey<Throwable> INBOUND_SSL_EXCEPTION = AttributeKey.valueOf("inboundSslException"); private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = AttributeKey.valueOf("caughtException"); - private final AtomicReference<ResultQueue> pending; + private final AtomicReference<ResultSet> pendingResultSet; - public GremlinResponseHandler(final AtomicReference<ResultQueue> pending) { - this.pending = pending; + public GremlinResponseHandler(final AtomicReference<ResultSet> pending) { + this.pendingResultSet = pending; } @Override @@ -61,7 +60,7 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // should fire off a close message which will properly release the driver. super.channelInactive(ctx); - final ResultQueue current = pending.getAndSet(null); + final ResultSet current = pendingResultSet.getAndSet(null); if (current != null) { current.markError(new IllegalStateException("Connection to server is no longer active")); } @@ -70,28 +69,28 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response @Override protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) { final HttpResponseStatus statusCode = response.getStatus() == null ? null : response.getStatus().getCode(); - final ResultQueue queue = pending.get(); + final ResultSet resultSet = pendingResultSet.get(); if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) { final List<Object> data = response.getResult().getData(); final boolean bulked = channelHandlerContext.channel().attr(HttpGremlinResponseStreamDecoder.IS_BULKED).get(); - // unrolls the collection into individual results to be handled by the queue. + // unrolls the collection into individual results to be handled by the ResultSet. if (bulked) { for (Iterator<Object> iter = data.iterator(); iter.hasNext(); ) { final Object obj = iter.next(); final long bulk = (long) iter.next(); DefaultRemoteTraverser<Object> item = new DefaultRemoteTraverser<>(obj, bulk); - queue.add(new Result(item)); + resultSet.add(new Result(item)); } } else { - data.forEach(item -> queue.add(new Result(item))); + data.forEach(item -> resultSet.add(new Result(item))); } } else { // this is a "success" but represents no results otherwise it is an error if (statusCode != HttpResponseStatus.NO_CONTENT) { // Save the error because there could be a subsequent HttpContent coming (probably just trailers). All - // content should be read first before marking the queue or else this channel might get reused too early. + // content should be read first before marking the ResultSet or else this channel might get reused too early. channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).set( new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(), response.getStatus().getException()) @@ -101,12 +100,12 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // Stream is done when the last content signaling response message is read. if (LAST_CONTENT_READ_RESPONSE == response) { - final ResultQueue resultQueue = pending.getAndSet(null); - if (resultQueue != null) { + final ResultSet rs = pendingResultSet.getAndSet(null); + if (rs != null) { if (null == channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) { - resultQueue.markComplete(); + rs.markComplete(); } else { - resultQueue.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); + rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null)); } } } @@ -119,8 +118,8 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // there are that many failures someone would take notice and hopefully stop the client. logger.error("Could not process the response", cause); - final ResultQueue pendingQueue = pending.getAndSet(null); - if (pendingQueue != null) pendingQueue.markError(cause); + final ResultSet resultSet = this.pendingResultSet.getAndSet(null); + if (resultSet != null) resultSet.markError(cause); if (ExceptionHelper.getRootCause(cause) instanceof SSLException) { // inbound ssl error can happen with tls 1.3 because client certification auth can fail after the handshake completes diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java similarity index 78% rename from gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java rename to gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java index 33b9287999..c2054f2761 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java @@ -18,31 +18,26 @@ */ package org.apache.tinkerpop.gremlin.driver; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.junit.After; import org.junit.Before; -import java.util.Collections; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @author Stephen Mallette (http://stephen.genoprime.com) */ -public abstract class AbstractResultQueueTest { +public abstract class AbstractResultSetTest { protected ExecutorService pool; - protected ResultQueue resultQueue; - protected CompletableFuture<Void> readCompleted; + protected ResultSet resultSet; @Before public void setup() { - final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>(); - readCompleted = new CompletableFuture<>(); - resultQueue = new ResultQueue(resultLinkedBlockingQueue, readCompleted); - pool = Executors.newCachedThreadPool(); + pool = Executors.newCachedThreadPool(); + resultSet = new ResultSet(pool, RequestMessage.build("traversal").create(), null); } @After @@ -63,15 +58,15 @@ public abstract class AbstractResultQueueTest { } /** - * Adds some test items to the {@link ResultQueue}. This method has the potential to block if the + * Adds some test items to the {@link ResultSet}. This method has the potential to block if the * {@code itemsToWaitFor > 0}. * * @param numberOfItemsToAdd the number of items to add in total - * @param pauseBetweenItemsInMillis amount of time to wait between additions to the {@link ResultQueue} + * @param pauseBetweenItemsInMillis amount of time to wait between additions to the {@link ResultSet} * @param start set to {@code true} to start the thread - * @param markDone force mark the queue as complete - * @param itemsToWaitFor block until this many items have been added to the {@link ResultQueue} - * @return the thread that is doing the work of adding to the queue + * @param markDone force mark the result set as complete + * @param itemsToWaitFor block until this many items have been added to the {@link ResultSet} + * @return the thread that is doing the work of adding to the result set * @throws Exception */ protected Thread addToQueue(final int numberOfItemsToAdd, final long pauseBetweenItemsInMillis, @@ -85,7 +80,7 @@ public abstract class AbstractResultQueueTest { final int currentIndex = ix; pool.submit(() -> { final Result result = new Result("test-" + currentIndex); - resultQueue.add(result); + resultSet.add(result); latch.countDown(); }).get(); TimeUnit.MILLISECONDS.sleep(pauseBetweenItemsInMillis); @@ -94,13 +89,13 @@ public abstract class AbstractResultQueueTest { } } - if (markDone) resultQueue.markComplete(); + if (markDone) resultSet.markComplete(); - }, "ResultQueueTest-job-submitter"); + }, "ResultSetTest-job-submitter"); if (start) t.start(); - // wait for the number of items requested to be added to the queue + // wait for the number of items requested to be added to the result set latch.await(); return t; diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java index 0edc12cd0a..71a4fba07a 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java @@ -19,17 +19,28 @@ package org.apache.tinkerpop.gremlin.driver; import org.apache.commons.lang3.tuple.Pair; +import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; import org.apache.tinkerpop.gremlin.driver.interceptor.PayloadSerializingInterceptor; +import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction; import org.junit.Test; +import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.tinkerpop.gremlin.driver.Cluster.SERIALIZER_INTERCEPTOR_NAME; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Test Cluster and Cluster.Builder. @@ -140,4 +151,110 @@ public class ClusterTest { final Cluster testCluster = Cluster.build().removeInterceptor(SERIALIZER_INTERCEPTOR_NAME).create(); assertEquals(0, testCluster.getRequestInterceptors().size()); } + + // ===== Tests for refactored connection pool ownership ===== + + /** + * borrowConnection() should delegate to the pool's borrowConnection with the configured timeout. + */ + @Test + public void shouldBorrowConnectionFromPool() throws Exception { + final Host host = mock(Host.class); + final Connection expectedConn = mock(Connection.class); + final ConnectionPool pool = mock(ConnectionPool.class); + when(pool.borrowConnection(anyLong(), any(TimeUnit.class))).thenReturn(expectedConn); + + final Settings.ConnectionPoolSettings poolSettings = new Settings.ConnectionPoolSettings(); + + final Cluster cluster = mock(Cluster.class); + when(cluster.isClosing()).thenReturn(false); + when(cluster.getPoolFor(host)).thenReturn(pool); + when(cluster.connectionPoolSettings()).thenReturn(poolSettings); + when(cluster.borrowConnection(host)).thenCallRealMethod(); + + final Connection conn = cluster.borrowConnection(host); + assertSame(expectedConn, conn); + verify(pool).borrowConnection(poolSettings.maxWaitForConnection, TimeUnit.MILLISECONDS); + } + + /** + * borrowConnection() should throw NoHostAvailableException when no pool exists for the host. + */ + @Test(expected = NoHostAvailableException.class) + public void shouldThrowNoHostAvailableWhenBorrowingWithNoPool() throws Exception { + final Host host = mock(Host.class); + + final Cluster cluster = mock(Cluster.class); + when(cluster.isClosing()).thenReturn(false); + when(cluster.getPoolFor(host)).thenReturn(null); + when(cluster.borrowConnection(host)).thenCallRealMethod(); + + cluster.borrowConnection(host); + } + + /** + * borrowConnection() should throw IllegalStateException when the cluster is closing. + */ + @Test(expected = IllegalStateException.class) + public void shouldThrowWhenBorrowingWhileClosing() throws Exception { + final Host host = mock(Host.class); + + final Cluster cluster = mock(Cluster.class); + when(cluster.isClosing()).thenReturn(true); + when(cluster.borrowConnection(host)).thenCallRealMethod(); + + cluster.borrowConnection(host); + } + + /** + * randomHost() should return the first host from the load balancing strategy when available. + */ + @Test + public void shouldSelectRandomHostFromLoadBalancer() { + final Host expectedHost = mock(Host.class); + final LoadBalancingStrategy strategy = mock(LoadBalancingStrategy.class); + when(strategy.select(any())).thenReturn(List.of(expectedHost).iterator()); + + final Cluster cluster = mock(Cluster.class); + when(cluster.loadBalancingStrategy()).thenReturn(strategy); + when(cluster.randomHost()).thenCallRealMethod(); + + final Host selected = cluster.randomHost(); + assertSame(expectedHost, selected); + } + + /** + * randomHost() should fall back to allHosts() when the load balancer returns an empty iterator. + */ + @Test + public void shouldFallbackToRandomHostWhenLoadBalancerReturnsEmpty() { + final Host host = mock(Host.class); + final LoadBalancingStrategy strategy = mock(LoadBalancingStrategy.class); + when(strategy.select(any())).thenReturn(Collections.emptyIterator()); + + final Cluster cluster = mock(Cluster.class); + when(cluster.loadBalancingStrategy()).thenReturn(strategy); + when(cluster.allHosts()).thenReturn(List.of(host)); + when(cluster.randomHost()).thenCallRealMethod(); + + // with only one host in allHosts(), randomHost() must return it + final Host selected = cluster.randomHost(); + assertSame(host, selected); + } + + /** + * randomHost() should throw NoHostAvailableException when there are no hosts at all. + */ + @Test(expected = NoHostAvailableException.class) + public void shouldThrowNoHostAvailableWhenNoHostsExist() { + final LoadBalancingStrategy strategy = mock(LoadBalancingStrategy.class); + when(strategy.select(any())).thenReturn(Collections.emptyIterator()); + + final Cluster cluster = mock(Cluster.class); + when(cluster.loadBalancingStrategy()).thenReturn(strategy); + when(cluster.allHosts()).thenReturn(Collections.emptyList()); + when(cluster.randomHost()).thenCallRealMethod(); + + cluster.randomHost(); + } } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java index 5c3bbb3e61..99b43de45a 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java @@ -61,10 +61,8 @@ public class ConnectionPoolTest { final Host host = mock(Host.class); - final Client client = new Client.ClusteredClient(cluster); - // create pool - starts with 1 connection - final ConnectionPool connectionPool = new ConnectionPool(host, client, Optional.of(2), connectionFactory); + final ConnectionPool connectionPool = new ConnectionPool(host, cluster, Optional.of(2), connectionFactory); // try to borrow a connection. final Connection conn0 = connectionPool.borrowConnection(100, TimeUnit.MILLISECONDS); diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java similarity index 67% rename from gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java rename to gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java index f540ddc123..4bc0f5b5b8 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java @@ -22,9 +22,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.Test; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,23 +35,25 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** + * Tests for the internal queuing functionality of {@link ResultSet}. + * * @author Stephen Mallette (http://stephen.genoprime.com) */ -public class ResultQueueTest extends AbstractResultQueueTest { +public class ResultSetQueuingTest extends AbstractResultSetTest { @Test public void shouldGetSizeUntilError() throws Exception { final Thread t = addToQueue(100, 10, true, false, 1); try { - assertThat(resultQueue.size(), is(greaterThan(0))); - assertThat(readCompleted.isDone(), is(false)); + assertThat(resultSet.getAvailableItemCount(), is(greaterThan(0))); + assertThat(resultSet.allItemsAvailable(), is(false)); final Exception theProblem = new Exception(); - resultQueue.markError(theProblem); - assertThat(readCompleted.isDone(), is(true)); + resultSet.markError(theProblem); + assertThat(resultSet.allItemsAvailable(), is(true)); try { - resultQueue.size(); + resultSet.getAvailableItemCount(); fail("Should have thrown an exception"); } catch (Exception ex) { assertEquals(theProblem, ex.getCause()); @@ -65,24 +65,24 @@ public class ResultQueueTest extends AbstractResultQueueTest { @Test public void shouldBeEmptyThenNotEmpty() { - assertThat(resultQueue.isEmpty(), is(true)); - resultQueue.add(new Result("test")); - assertThat(resultQueue.isEmpty(), is(false)); + assertThat(resultSet.isEmpty(), is(true)); + resultSet.add(new Result("test")); + assertThat(resultSet.isEmpty(), is(false)); } @Test public void shouldNotBeEmptyUntilError() throws Exception { final Thread t = addToQueue(100, 10, true, false, 1); try { - assertThat(resultQueue.isEmpty(), is(false)); - assertThat(readCompleted.isDone(), is(false)); + assertThat(resultSet.isEmpty(), is(false)); + assertThat(resultSet.allItemsAvailable(), is(false)); final Exception theProblem = new Exception(); - resultQueue.markError(theProblem); - assertThat(readCompleted.isDone(), is(true)); + resultSet.markError(theProblem); + assertThat(resultSet.allItemsAvailable(), is(true)); try { - resultQueue.isEmpty(); + resultSet.isEmpty(); fail("Should have thrown an exception"); } catch (Exception ex) { assertEquals(theProblem, ex.getCause()); @@ -96,24 +96,24 @@ public class ResultQueueTest extends AbstractResultQueueTest { public void shouldDrainUntilError() throws Exception { final Thread t = addToQueue(100, 10, true, false, 1); try { - assertThat(resultQueue.isEmpty(), is(false)); + assertThat(resultSet.isEmpty(), is(false)); final List<Result> drain = new ArrayList<>(); - resultQueue.drainTo(drain); + resultSet.drainTo(drain); assertThat(drain.size(), is(greaterThan(0))); - assertThat(readCompleted.isDone(), is(false)); + assertThat(resultSet.allItemsAvailable(), is(false)); - // make sure some more items get added to the queue before assert + // make sure some more items get added to the result set before assert TimeUnit.MILLISECONDS.sleep(100); - assertThat(resultQueue.isEmpty(), is(false)); - assertThat(readCompleted.isDone(), is(false)); + assertThat(resultSet.isEmpty(), is(false)); + assertThat(resultSet.allItemsAvailable(), is(false)); final Exception theProblem = new Exception(); - resultQueue.markError(theProblem); - assertThat(readCompleted.isDone(), is(true)); + resultSet.markError(theProblem); + assertThat(resultSet.allItemsAvailable(), is(true)); try { - resultQueue.drainTo(new ArrayList<>()); + resultSet.drainTo(new ArrayList<>()); fail("Should have thrown an exception"); } catch (Exception ex) { assertEquals(theProblem, ex.getCause()); @@ -125,13 +125,13 @@ public class ResultQueueTest extends AbstractResultQueueTest { @Test public void shouldAwaitEverythingAndFlushOnMarkCompleted() throws Exception { - final CompletableFuture<List<Result>> future = resultQueue.await(4); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + final CompletableFuture<List<Result>> future = resultSet.some(4); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); assertThat(future.isDone(), is(false)); - resultQueue.markComplete(); + resultSet.markComplete(); assertThat(future.isDone(), is(true)); final List<Result> results = future.get(); @@ -140,18 +140,18 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test3", results.get(2).getString()); assertEquals(3, results.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test public void shouldAwaitFailTheFutureOnMarkError() { - final CompletableFuture<List<Result>> future = resultQueue.await(4); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + final CompletableFuture<List<Result>> future = resultSet.some(4); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); assertThat(future.isDone(), is(false)); - resultQueue.markError(new Exception("no worky")); + resultSet.markError(new Exception("no worky")); assertThat(future.isDone(), is(true)); try { @@ -164,14 +164,14 @@ public class ResultQueueTest extends AbstractResultQueueTest { @Test public void shouldAwaitToExpectedValueAndDrainOnAdd() throws Exception { - final CompletableFuture<List<Result>> future = resultQueue.await(3); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); + final CompletableFuture<List<Result>> future = resultSet.some(3); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); // shouldn't complete until the third item is in play assertThat(future.isDone(), is(false)); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test3")); final List<Result> results = future.get(); assertEquals("test1", results.get(0).getString()); @@ -179,21 +179,21 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test3", results.get(2).getString()); assertEquals(3, results.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test public void shouldAwaitMultipleToExpectedValueAndDrainOnAdd() throws Exception { - final CompletableFuture<List<Result>> future1 = resultQueue.await(3); - final CompletableFuture<List<Result>> future2 = resultQueue.await(1); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); + final CompletableFuture<List<Result>> future1 = resultSet.some(3); + final CompletableFuture<List<Result>> future2 = resultSet.some(1); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); // shouldn't complete the first future until the third item is in play assertThat(future1.isDone(), is(false)); assertThat(future2.isDone(), is(false)); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test3")); final List<Result> results1 = future1.get(); assertEquals("test1", results1.get(0).getString()); @@ -203,7 +203,7 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertThat(future1.isDone(), is(true)); assertThat(future2.isDone(), is(false)); - resultQueue.add(new Result("test4")); + resultSet.add(new Result("test4")); assertThat(future1.isDone(), is(true)); assertThat(future2.isDone(), is(true)); @@ -211,16 +211,16 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test4", results2.get(0).getString()); assertEquals(1, results2.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test public void shouldAwaitToExpectedValueAndDrainOnAwait() throws Exception { - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); - final CompletableFuture<List<Result>> future = resultQueue.await(3); + final CompletableFuture<List<Result>> future = resultSet.some(3); assertThat(future.isDone(), is(true)); final List<Result> results = future.get(); @@ -229,19 +229,19 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test3", results.get(2).getString()); assertEquals(3, results.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test public void shouldAwaitToReadCompletedAndDrainOnAwait() throws Exception { - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); - resultQueue.markComplete(); + resultSet.markComplete(); // you might want 30 but there are only three - final CompletableFuture<List<Result>> future = resultQueue.await(30); + final CompletableFuture<List<Result>> future = resultSet.some(30); assertThat(future.isDone(), is(true)); final List<Result> results = future.get(); @@ -250,7 +250,7 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals("test3", results.get(2).getString()); assertEquals(3, results.size()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } @Test @@ -262,17 +262,17 @@ public class ResultQueueTest extends AbstractResultQueueTest { final AtomicInteger count3 = new AtomicInteger(0); final CountDownLatch latch = new CountDownLatch(3); - resultQueue.await(500).thenAcceptAsync(r -> { + resultSet.some(500).thenAcceptAsync(r -> { count1.set(r.size()); latch.countDown(); }); - resultQueue.await(150).thenAcceptAsync(r -> { + resultSet.some(150).thenAcceptAsync(r -> { count2.set(r.size()); latch.countDown(); }); - resultQueue.await(350).thenAcceptAsync(r -> { + resultSet.some(350).thenAcceptAsync(r -> { count3.set(r.size()); latch.countDown(); }); @@ -283,7 +283,7 @@ public class ResultQueueTest extends AbstractResultQueueTest { assertEquals(150, count2.get()); assertEquals(350, count3.get()); - assertThat(resultQueue.isEmpty(), is(true)); + assertThat(resultSet.isEmpty(), is(true)); } finally { t.interrupt(); } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java index 855d9466a9..4bf3b8c6b7 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java @@ -18,8 +18,6 @@ */ package org.apache.tinkerpop.gremlin.driver; -import org.apache.tinkerpop.gremlin.util.message.RequestMessage; -import org.junit.Before; import org.junit.Test; import java.util.Iterator; @@ -39,20 +37,13 @@ import static org.junit.Assert.fail; /** * @author Stephen Mallette (http://stephen.genoprime.com) */ -public class ResultSetTest extends AbstractResultQueueTest { - - private ResultSet resultSet; - - @Before - public void setupThis() { - resultSet = new ResultSet(resultQueue, pool, readCompleted, RequestMessage.build("traversal").create(), null); - } +public class ResultSetTest extends AbstractResultSetTest { @Test public void shouldHaveAllItemsAvailableAsynchronouslyOnReadComplete() throws InterruptedException { final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync(); assertThat(all.isDone(), is(false)); - readCompleted.complete(null); + resultSet.markComplete(); // flush all tasks in pool pool.awaitTermination(2, TimeUnit.SECONDS); assertThat(all.isDone(), is(true)); @@ -62,7 +53,7 @@ public class ResultSetTest extends AbstractResultQueueTest { public void shouldHaveAllItemsAvailableAsynchronouslyOnReadCompleteExceptionally() throws InterruptedException { final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync(); assertThat(all.isDone(), is(false)); - readCompleted.completeExceptionally(new RuntimeException()); + resultSet.markError(new RuntimeException()); // flush all tasks in pool pool.awaitTermination(2, TimeUnit.SECONDS); assertThat(all.isDone(), is(true)); @@ -72,7 +63,7 @@ public class ResultSetTest extends AbstractResultQueueTest { @Test public void shouldHaveAllItemsAvailableOnReadComplete() throws InterruptedException { assertThat(resultSet.allItemsAvailable(), is(false)); - readCompleted.complete(null); + resultSet.markComplete(); // flush all tasks in pool pool.awaitTermination(2, TimeUnit.SECONDS); assertThat(resultSet.allItemsAvailable(), is(true)); @@ -85,7 +76,7 @@ public class ResultSetTest extends AbstractResultQueueTest { final AtomicBoolean atLeastOnce = new AtomicBoolean(false); addToQueue(1000, 1, true, true); - while (!readCompleted.isDone()) { + while (!resultSet.allItemsAvailable()) { atLeastOnce.set(true); if (!atLeastOnce.get()) assertThat(all.isDone(), is(false)); @@ -104,7 +95,7 @@ public class ResultSetTest extends AbstractResultQueueTest { final AtomicBoolean atLeastOnce = new AtomicBoolean(false); addToQueue(1000, 1, true, true); - while (!readCompleted.isDone()) { + while (!resultSet.allItemsAvailable()) { atLeastOnce.set(true); if (!atLeastOnce.get()) assertThat(resultSet.allItemsAvailable(), is(false)); @@ -117,12 +108,12 @@ public class ResultSetTest extends AbstractResultQueueTest { @Test public void shouldAwaitEverythingAndFlushOnMarkCompleted() throws Exception { final CompletableFuture<List<Result>> future = resultSet.some(4); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); assertThat(future.isDone(), is(false)); - resultQueue.markComplete(); + resultSet.markComplete(); assertThat(future.isDone(), is(true)); final List<Result> results = future.get(); @@ -138,12 +129,12 @@ public class ResultSetTest extends AbstractResultQueueTest { @Test public void shouldGetAllOnlyOnComplete() throws Exception { final CompletableFuture<List<Result>> future = resultSet.all(); - resultQueue.add(new Result("test1")); - resultQueue.add(new Result("test2")); - resultQueue.add(new Result("test3")); + resultSet.add(new Result("test1")); + resultSet.add(new Result("test2")); + resultSet.add(new Result("test3")); assertThat(future.isDone(), is(false)); - resultQueue.markComplete(); + resultSet.markComplete(); final List<Result> results = future.get(); assertEquals("test1", results.get(0).getString()); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java index ba48bf6d56..a307028ee1 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java @@ -22,7 +22,6 @@ import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import io.netty.handler.codec.TooLongFrameException; import nl.altindag.log.LogCaptor; -import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest; @@ -34,7 +33,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.runner.Request; import org.slf4j.LoggerFactory; import java.util.ArrayList; @@ -129,9 +127,8 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat assertEquals(1, cluster.availableHosts().size()); // Assert that there is no connection leak and all connections have been closed - assertEquals(0, client.hostConnectionPools.values().stream() - .findFirst().get() - .numConnectionsWaitingToCleanup()); + assertEquals(0, cluster.getPoolFor(cluster.allHosts().iterator().next()) + .numConnectionsWaitingToCleanup()); } finally { cluster.close(); } @@ -143,51 +140,6 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat } - @Test - public void shouldBalanceConcurrentRequestsAcrossConnections() throws InterruptedException { - final int connPoolSize = 16; - final Cluster cluster = TestClientFactory.build() - .maxConnectionPoolSize(connPoolSize) - .create(); - final Client.ClusteredClient client = cluster.connect(); - client.init(); - final ExecutorService executorServiceForTesting = cluster.executor(); - - try { - final RequestMessage.Builder request = client.buildMessage(RequestMessage.build("Thread.sleep(5000)")); - final Callable<Connection> sendQueryCallable = () -> client.chooseConnection(request.create()); - final List<Callable<Connection>> listOfTasks = new ArrayList<>(); - for (int i = 0; i < connPoolSize; i++) { - listOfTasks.add(sendQueryCallable); - } - - HashMap<String, Integer> channelsSize = new HashMap<>(); - - final List<Future<Connection>> executorSubmitFutures = executorServiceForTesting.invokeAll(listOfTasks); - executorSubmitFutures.parallelStream().map(fut -> { - try { - return fut.get(); - } catch (InterruptedException | ExecutionException e) { - fail(e.getMessage()); - return null; - } - }).forEach(conn -> { - String id = conn.getChannelId(); - channelsSize.put(id, channelsSize.getOrDefault(id, 0) + 1); - }); - - assertNotEquals(channelsSize.entrySet().size(), 0); - channelsSize.entrySet().forEach(entry -> { - assertEquals(1, (entry.getValue()).intValue()); - }); - - } finally { - executorServiceForTesting.shutdown(); - client.close(); - cluster.close(); - } - } - @Test public void shouldCreateNewHttpConnectionPerRequestAsNeeded() throws InterruptedException { final int operations = 6; @@ -199,8 +151,7 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat final ExecutorService executorServiceForTesting = cluster.executor(); try { - final RequestMessage.Builder request = client.buildMessage(RequestMessage.build("Thread.sleep(5000)")); - final Callable<Connection> sendQueryCallable = () -> client.chooseConnection(request.create()); + final Callable<Connection> sendQueryCallable = () -> cluster.borrowConnection(cluster.allHosts().iterator().next()); final List<Callable<Connection>> listOfTasks = new ArrayList<>(); for (int i = 0; i < operations; i++) { listOfTasks.add(sendQueryCallable); @@ -250,7 +201,7 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat // every 10 connections let's have some problems final JitteryConnectionFactory connectionFactory = new JitteryConnectionFactory(3); - client.hostConnectionPools.forEach((h, pool) -> pool.connectionFactory = connectionFactory); + cluster.allHosts().forEach(h -> cluster.getPoolFor(h).connectionFactory = connectionFactory); // get an initial connection which marks the host as available assertEquals(2, client.submit("g.inject(2)").all().join().get(0).getInt()); @@ -313,9 +264,9 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat // simulate downtime where there is no traffic so that all connections become idle TimeUnit.MILLISECONDS.sleep(idleMillis * 3); - assertEquals(1, client.hostConnectionPools.size()); + assertEquals(1, cluster.allHosts().size()); // all connections should have been closed due to idle timeout - assertTrue(client.hostConnectionPools.values().iterator().next().getPoolInfo().contains("no connections in pool")); + assertTrue(cluster.getPoolFor(cluster.allHosts().iterator().next()).getPoolInfo().contains("no connections in pool")); // create or reuse some more connections chooseConnections(4, client, executorServiceForTesting); @@ -348,11 +299,11 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat } private static void chooseConnections(int operations, Client.ClusteredClient client, ExecutorService executorServiceForTesting) throws InterruptedException { - final RequestMessage.Builder request = client.buildMessage(RequestMessage.build("g.inject(1)")); - final Callable<Connection> sendQueryCallable = () -> client.chooseConnection(request.create()); + final Cluster cluster = client.getCluster(); + final Callable<Connection> borrowConnectionCallable = () -> cluster.borrowConnection(cluster.allHosts().iterator().next()); final List<Callable<Connection>> listOfTasks = new ArrayList<>(); for (int i = 0; i < operations; i++) { - listOfTasks.add(sendQueryCallable); + listOfTasks.add(borrowConnectionCallable); } final List<Future<Connection>> executorSubmitFutures = executorServiceForTesting.invokeAll(listOfTasks); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index 7b051812c7..89ad207135 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -1094,13 +1094,11 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration public void shouldCloseAllClientsOnCloseOfCluster() throws Exception { final Cluster cluster = TestClientFactory.open(); final Client sessionlessOne = cluster.connect(); - final Client session = cluster.connect("session"); final Client sessionlessTwo = cluster.connect(); final Client sessionlessThree = cluster.connect(); final Client sessionlessFour = cluster.connect(); assertEquals(2, sessionlessOne.submit("g.inject(2)").all().get().get(0).getInt()); - assertEquals(2, session.submit("g.inject(2)").all().get().get(0).getInt()); assertEquals(2, sessionlessTwo.submit("g.inject(2)").all().get().get(0).getInt()); assertEquals(2, sessionlessThree.submit("g.inject(2)").all().get().get(0).getInt()); // dont' send anything on the 4th client @@ -1119,7 +1117,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration } try { - session.submit("g.inject(2)").all().get(); fail("Should have tossed an exception because cluster was closed"); } catch (Exception ex) { final Throwable root = ExceptionHelper.getRootCause(ex); @@ -1156,7 +1153,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration // allow call to close() even though closed through cluster sessionlessOne.close(); - session.close(); sessionlessTwo.close(); cluster.close(); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java index 17563cdd8b..f6452abd09 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java @@ -209,22 +209,6 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes } } - @Test - public void shouldFailToUseSession() { - final Cluster cluster = TestClientFactory.build().create(); - try { - final Client client = cluster.connect("shouldFailToUseSession"); - client.submit("g.inject(2)").all().get(); - fail("Can't use session with HTTP"); - } catch (Exception ex) { - final Throwable t = ExceptionUtils.getRootCause(ex); - // assertEquals("Cannot use sessions or tx() with HttpChannelizer", t.getMessage()); - assertEquals("not implemented", t.getMessage()); - } finally { - cluster.close(); - } - } - @Test public void shouldDeserializeErrorWithGraphBinary() { final Cluster cluster = TestClientFactory.build().create();
