This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch master-tx-client
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit b225dac4d90e396f26f9d06909db2d91b562c65b
Author: Ken Hu <[email protected]>
AuthorDate: Wed Feb 25 15:40:05 2026 -0800

    Merge ResultQueue into ResultSet to simplify result handling
    
    The ResultQueue class was an unnecessary indirection layer between Netty
    and the ResultSet. This was probably more useful when the protocol was
    WebSocket-based and there were multiple requests on the same connection
    but it doesn't seem to have much of a purpose anymore.
    
    This new setup also potentially works better for HTTP/2 if the high
    level, stream-based API is used. If the frame-based API is used then
    changes might be required in the future.
    
    Some tests were deleted as they aren't actually relevant anymore because
    of HTTP and this refactoring.
---
 .../tinkerpop/gremlin/driver/Channelizer.java      |   2 +-
 .../tinkerpop/gremlin/driver/Connection.java       |  29 ++---
 .../tinkerpop/gremlin/driver/ResultQueue.java      | 139 ---------------------
 .../apache/tinkerpop/gremlin/driver/ResultSet.java | 120 ++++++++++++++++--
 .../driver/handler/GremlinResponseHandler.java     |  35 +++---
 ...ltQueueTest.java => AbstractResultSetTest.java} |  33 +++--
 .../tinkerpop/gremlin/driver/ClusterTest.java      | 117 +++++++++++++++++
 .../gremlin/driver/ConnectionPoolTest.java         |   4 +-
 ...ultQueueTest.java => ResultSetQueuingTest.java} | 126 +++++++++----------
 .../tinkerpop/gremlin/driver/ResultSetTest.java    |  37 +++---
 .../driver/ClientConnectionIntegrateTest.java      |  67 ++--------
 .../gremlin/server/GremlinDriverIntegrateTest.java |   4 -
 .../gremlin/server/HttpDriverIntegrateTest.java    |  16 ---
 13 files changed, 358 insertions(+), 371 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index fd7d585baf..427f76c056 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -92,7 +92,7 @@ public interface Channelizer extends ChannelHandler {
         protected Connection connection;
         protected Cluster cluster;
         protected SslHandler sslHandler;
-        private AtomicReference<ResultQueue> pending;
+        private AtomicReference<ResultSet> pending;
 
         protected static final String PIPELINE_GREMLIN_HANDLER = 
"gremlin-handler";
         protected static final String PIPELINE_SSL_HANDLER = 
"gremlin-ssl-handler";
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 5f9c1325d3..cb94b39b87 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -32,7 +32,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.URI;
 import java.time.Instant;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -56,7 +55,7 @@ final class Connection {
 
     private final Channel channel;
     private final URI uri;
-    private final AtomicReference<ResultQueue> pending = new 
AtomicReference<>();
+    private final AtomicReference<ResultSet> pending = new AtomicReference<>();
     private final Cluster cluster;
     private final Client client;
     private final ConnectionPool pool;
@@ -155,7 +154,7 @@ final class Connection {
         return cluster;
     }
 
-    AtomicReference<ResultQueue> getPending() {
+    AtomicReference<ResultSet> getPending() {
         return pending;
     }
 
@@ -184,9 +183,9 @@ final class Connection {
         return future;
     }
 
-    public ChannelPromise write(final RequestMessage requestMessage, final 
CompletableFuture<ResultSet> resultQueueSetup) {
-        // once there is a completed write, then create a traverser for the 
result set and complete
-        // the promise so that the client knows that that it can start 
checking for results.
+    public ChannelPromise write(final RequestMessage requestMessage, final 
CompletableFuture<ResultSet> resultSetFuture) {
+        // once there is a completed write, then create a ResultSet and 
complete
+        // the promise so that the client knows that it can start checking for 
results.
         final Connection thisConnection = this;
 
         final ChannelPromise requestPromise = channel.newPromise()
@@ -198,12 +197,11 @@ final class Connection {
 
                         handleConnectionCleanupOnError(thisConnection);
 
-                        cluster.executor().submit(() -> 
resultQueueSetup.completeExceptionally(f.cause()));
+                        cluster.executor().submit(() -> 
resultSetFuture.completeExceptionally(f.cause()));
                     } else {
-                        final LinkedBlockingQueue<Result> 
resultLinkedBlockingQueue = new LinkedBlockingQueue<>();
-                        final CompletableFuture<Void> readCompleted = new 
CompletableFuture<>();
+                        final ResultSet resultSet = new 
ResultSet(cluster.executor(), requestMessage, pool.host);
 
-                        readCompleted.whenCompleteAsync((v, t) -> {
+                        resultSet.getReadCompleted().whenCompleteAsync((v, t) 
-> {
                             if (t != null) {
                                 // the callback for when the read failed. a 
failed read means the request went to the server
                                 // and came back with a server-side error of 
some sort.  it means the server is responsive
@@ -212,7 +210,7 @@ final class Connection {
                                 logger.debug("Error while processing request 
on the server {}.", this, t);
                                 handleConnectionCleanupOnError(thisConnection);
                             } else {
-                                // the callback for when the read was 
successful, meaning that ResultQueue.markComplete()
+                                // the callback for when the read was 
successful, meaning that ResultSet.markComplete()
                                 // was called
                                 thisConnection.returnToPool();
                             }
@@ -223,15 +221,12 @@ final class Connection {
                             tryShutdown();
                         }, cluster.executor());
 
-                        final ResultQueue handler = new 
ResultQueue(resultLinkedBlockingQueue, readCompleted);
-                        // pending.put(requestMessage.getRequestId(), handler);
-                        pending.set(handler);
+                        pending.set(resultSet);
 
-                        // resultQueueSetup should only be completed by a 
worker since the application code might have sync
+                        // resultSetFuture should only be completed by a 
worker since the application code might have sync
                         // completion stages attached to it which and we do 
not want the event loop threads to process those
                         // stages.
-                        cluster.executor().submit(() -> 
resultQueueSetup.complete(
-                                new ResultSet(handler, cluster.executor(), 
readCompleted, requestMessage, pool.host)));
+                        cluster.executor().submit(() -> 
resultSetFuture.complete(resultSet));
                     }
                 });
         channel.writeAndFlush(requestMessage, requestPromise);
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
deleted file mode 100644
index e91e833b94..0000000000
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.driver;
-
-import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.javatuples.Pair;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A queue of incoming {@link Result} objects.  The queue is updated by the 
{@link GremlinResponseHandler}
- * until a response terminator is identified.
- *
- * @author Stephen Mallette (http://stephen.genoprime.com)
- */
-@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-public final class ResultQueue {
-
-    private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue;
-
-    private final AtomicReference<Throwable> error = new AtomicReference<>();
-
-    private final CompletableFuture<Void> readComplete;
-
-    private final Queue<Pair<CompletableFuture<List<Result>>,Integer>> waiting 
= new ConcurrentLinkedQueue<>();
-
-    public ResultQueue(final LinkedBlockingQueue<Result> 
resultLinkedBlockingQueue, final CompletableFuture<Void> readComplete) {
-        this.resultLinkedBlockingQueue = resultLinkedBlockingQueue;
-        this.readComplete = readComplete;
-    }
-
-    /**
-     * Adds a {@link Result} to the queue which will be later read by the 
{@link ResultSet}.
-     *
-     * @param result a return value from the {@link Traversal} or script 
submitted for execution
-     */
-    public void add(final Result result) {
-        this.resultLinkedBlockingQueue.offer(result);
-        tryDrainNextWaiting(false);
-    }
-
-    public CompletableFuture<List<Result>> await(final int items) {
-        final CompletableFuture<List<Result>> result = new 
CompletableFuture<>();
-        waiting.add(Pair.with(result, items));
-
-        tryDrainNextWaiting(false);
-
-        return result;
-    }
-
-    public int size() {
-        if (error.get() != null) throw new RuntimeException(error.get());
-        return this.resultLinkedBlockingQueue.size();
-    }
-
-    public boolean isEmpty() {
-        if (error.get() != null) throw new RuntimeException(error.get());
-        return this.size() == 0;
-    }
-
-    public boolean isComplete() {
-        return readComplete.isDone();
-    }
-
-    void drainTo(final Collection<Result> collection) {
-        if (error.get() != null) throw new RuntimeException(error.get());
-        resultLinkedBlockingQueue.drainTo(collection);
-    }
-
-     public void markComplete() {
-        this.readComplete.complete(null);
-
-        this.drainAllWaiting();
-    }
-
-    public void markError(final Throwable throwable) {
-        error.set(throwable);
-        this.readComplete.completeExceptionally(throwable);
-        this.drainAllWaiting();
-    }
-
-    /**
-     * Completes the next waiting future if there is one.
-     */
-    private synchronized void tryDrainNextWaiting(final boolean force) {
-        // need to peek because the number of available items needs to be >= 
the expected size for that future. if not
-        // it needs to keep waiting
-        final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = 
waiting.peek();
-        if (nextWaiting != null && (force || (resultLinkedBlockingQueue.size() 
>= nextWaiting.getValue1() || readComplete.isDone()))) {
-            final int items = nextWaiting.getValue1();
-            final CompletableFuture<List<Result>> future = 
nextWaiting.getValue0();
-            final List<Result> results = new ArrayList<>(items);
-            resultLinkedBlockingQueue.drainTo(results, items);
-
-            // it's important to check for error here because a future may 
have already been queued in "waiting" prior
-            // to the first response back from the server. if that happens, 
any "waiting" futures should be completed
-            // exceptionally otherwise it will look like success.
-            if (null == error.get())
-                future.complete(results);
-            else
-                future.completeExceptionally(error.get());
-
-            waiting.remove(nextWaiting);
-        }
-    }
-
-    /**
-     * Completes all remaining futures.
-     */
-    private void drainAllWaiting() {
-        while (!waiting.isEmpty()) {
-            tryDrainNextWaiting(true);
-        }
-    }
-}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 0656586706..b5d506bc1f 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -19,22 +19,28 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.javatuples.Pair;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Queue;
 import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 /**
  * A {@code ResultSet} is returned from the submission of a Gremlin script to 
the server and represents the
  * results provided by the server.  The results from the server are streamed 
into the {@code ResultSet} and
- * therefore may not be available immediately.  As such, {@code ResultSet} 
provides access to a a number
+ * therefore may not be available immediately.  As such, {@code ResultSet} 
provides access to a number
  * of functions that help to work with the asynchronous nature of the data 
streaming back.  Data from results
  * is stored in an {@link Result} which can be used to retrieve the item once 
it is on the client side.
  * <p/>
@@ -48,20 +54,18 @@ import java.util.stream.StreamSupport;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public final class ResultSet implements Iterable<Result> {
-    private final ResultQueue resultQueue;
+    private final LinkedBlockingQueue<Result> resultQueue = new 
LinkedBlockingQueue<>();
+    private final Queue<Pair<CompletableFuture<List<Result>>, Integer>> 
waiting = new ConcurrentLinkedQueue<>();
+    private final AtomicReference<Throwable> error = new AtomicReference<>();
+    private final CompletableFuture<Void> readCompleted = new 
CompletableFuture<>();
+
     private final ExecutorService executor;
     private final RequestMessage originalRequestMessage;
     private final Host host;
 
-    private final CompletableFuture<Void> readCompleted;
-
-    public ResultSet(final ResultQueue resultQueue, final ExecutorService 
executor,
-                     final CompletableFuture<Void> readCompleted, final 
RequestMessage originalRequestMessage,
-                     final Host host) {
+    public ResultSet(final ExecutorService executor, final RequestMessage 
originalRequestMessage, final Host host) {
         this.executor = executor;
         this.host = host;
-        this.resultQueue = resultQueue;
-        this.readCompleted = readCompleted;
         this.originalRequestMessage = originalRequestMessage;
     }
 
@@ -94,6 +98,7 @@ public final class ResultSet implements Iterable<Result> {
      * Gets the number of items available on the client.
      */
     public int getAvailableItemCount() {
+        if (error.get() != null) throw new RuntimeException(error.get());
         return resultQueue.size();
     }
 
@@ -114,7 +119,10 @@ public final class ResultSet implements Iterable<Result> {
      * completed and there are less than that number specified available.
      */
     public CompletableFuture<List<Result>> some(final int items) {
-        return resultQueue.await(items);
+        final CompletableFuture<List<Result>> result = new 
CompletableFuture<>();
+        waiting.add(Pair.with(result, items));
+        tryDrainNextWaiting(false);
+        return result;
     }
 
     /**
@@ -125,6 +133,7 @@ public final class ResultSet implements Iterable<Result> {
      */
     public CompletableFuture<List<Result>> all() {
         return readCompleted.thenApplyAsync(unusedInput -> {
+            if (error.get() != null) throw new RuntimeException(error.get());
             final List<Result> list = new ArrayList<>();
             resultQueue.drainTo(list);
             return list;
@@ -174,4 +183,95 @@ public final class ResultSet implements Iterable<Result> {
             }
         };
     }
+
+    // ==================== Methods called by Handlers ====================
+
+    /**
+     * Adds a {@link Result} to the queue which will be later read by 
consumers.
+     *
+     * @param result a return value from the traversal or script submitted for 
execution
+     */
+    public void add(final Result result) {
+        this.resultQueue.offer(result);
+        tryDrainNextWaiting(false);
+    }
+
+    /**
+     * Marks the result stream as complete.
+     */
+    public void markComplete() {
+        this.readCompleted.complete(null);
+        this.drainAllWaiting();
+    }
+
+    /**
+     * Marks the result stream as failed with an error.
+     *
+     * @param throwable the error that occurred
+     */
+    public void markError(final Throwable throwable) {
+        error.set(throwable);
+        this.readCompleted.completeExceptionally(throwable);
+        this.drainAllWaiting();
+    }
+
+    /**
+     * Returns the future that completes when reading is done. Used internally
+     * for connection lifecycle management.
+     */
+    CompletableFuture<Void> getReadCompleted() {
+        return readCompleted;
+    }
+
+    // ==================== Internal queue management ====================
+
+    /**
+     * Checks if the queue is empty.
+     */
+    boolean isEmpty() {
+        if (error.get() != null) throw new RuntimeException(error.get());
+        return resultQueue.isEmpty();
+    }
+
+    /**
+     * Drains results to the provided collection.
+     */
+    void drainTo(final Collection<Result> collection) {
+        if (error.get() != null) throw new RuntimeException(error.get());
+        resultQueue.drainTo(collection);
+    }
+
+    /**
+     * Completes the next waiting future if there is one and enough results 
are available.
+     */
+    private synchronized void tryDrainNextWaiting(final boolean force) {
+        // need to peek because the number of available items needs to be >= 
the expected size for that future. if not
+        // it needs to keep waiting
+        final Pair<CompletableFuture<List<Result>>, Integer> nextWaiting = 
waiting.peek();
+        if (nextWaiting != null && (force || (resultQueue.size() >= 
nextWaiting.getValue1() || readCompleted.isDone()))) {
+            final int items = nextWaiting.getValue1();
+            final CompletableFuture<List<Result>> future = 
nextWaiting.getValue0();
+            final List<Result> results = new ArrayList<>(items);
+            resultQueue.drainTo(results, items);
+
+            // it's important to check for error here because a future may 
have already been queued in "waiting" prior
+            // to the first response back from the server. if that happens, 
any "waiting" futures should be completed
+            // exceptionally otherwise it will look like success.
+            if (null == error.get())
+                future.complete(results);
+            else
+                future.completeExceptionally(error.get());
+
+            waiting.remove(nextWaiting);
+        }
+    }
+
+    /**
+     * Completes all remaining futures.
+     */
+    private void drainAllWaiting() {
+        while (!waiting.isEmpty()) {
+            tryDrainNextWaiting(true);
+        }
+    }
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index a94f17bdb2..d4cfb167d2 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -25,7 +25,7 @@ import io.netty.util.AttributeKey;
 import javax.net.ssl.SSLException;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Result;
-import org.apache.tinkerpop.gremlin.driver.ResultQueue;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import 
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
 import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
@@ -42,17 +42,16 @@ import java.util.concurrent.atomic.AtomicReference;
 import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
 
 /**
- * Takes a map of requests pending responses and writes responses to the 
{@link ResultQueue} of a request
- * as the {@link ResponseMessage} objects are deserialized.
+ * Writes responses to the {@link ResultSet} of a request as the {@link 
ResponseMessage} objects are deserialized.
  */
 public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<ResponseMessage> {
     public static final AttributeKey<Throwable> INBOUND_SSL_EXCEPTION = 
AttributeKey.valueOf("inboundSslException");
     private static final Logger logger = 
LoggerFactory.getLogger(GremlinResponseHandler.class);
     private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = 
AttributeKey.valueOf("caughtException");
-    private final AtomicReference<ResultQueue> pending;
+    private final AtomicReference<ResultSet> pendingResultSet;
 
-    public GremlinResponseHandler(final AtomicReference<ResultQueue> pending) {
-        this.pending = pending;
+    public GremlinResponseHandler(final AtomicReference<ResultSet> pending) {
+        this.pendingResultSet = pending;
     }
 
     @Override
@@ -61,7 +60,7 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
         // should fire off a close message which will properly release the 
driver.
         super.channelInactive(ctx);
 
-        final ResultQueue current = pending.getAndSet(null);
+        final ResultSet current = pendingResultSet.getAndSet(null);
         if (current != null) {
             current.markError(new IllegalStateException("Connection to server 
is no longer active"));
         }
@@ -70,28 +69,28 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
     @Override
     protected void channelRead0(final ChannelHandlerContext 
channelHandlerContext, final ResponseMessage response) {
         final HttpResponseStatus statusCode = response.getStatus() == null ? 
null : response.getStatus().getCode();
-        final ResultQueue queue = pending.get();
+        final ResultSet resultSet = pendingResultSet.get();
 
         if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) {
             final List<Object> data = response.getResult().getData();
             final boolean bulked = 
channelHandlerContext.channel().attr(HttpGremlinResponseStreamDecoder.IS_BULKED).get();
-            // unrolls the collection into individual results to be handled by 
the queue.
+            // unrolls the collection into individual results to be handled by 
the ResultSet.
             if (bulked) {
                 for (Iterator<Object> iter = data.iterator(); iter.hasNext(); 
) {
                     final Object obj = iter.next();
                     final long bulk = (long) iter.next();
                     DefaultRemoteTraverser<Object> item = new 
DefaultRemoteTraverser<>(obj, bulk);
-                    queue.add(new Result(item));
+                    resultSet.add(new Result(item));
                 }
             } else {
-                data.forEach(item -> queue.add(new Result(item)));
+                data.forEach(item -> resultSet.add(new Result(item)));
             }
 
         } else {
             // this is a "success" but represents no results otherwise it is 
an error
             if (statusCode != HttpResponseStatus.NO_CONTENT) {
                 // Save the error because there could be a subsequent 
HttpContent coming (probably just trailers). All
-                // content should be read first before marking the queue or 
else this channel might get reused too early.
+                // content should be read first before marking the ResultSet 
or else this channel might get reused too early.
                 channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).set(
                         new ResponseException(response.getStatus().getCode(), 
response.getStatus().getMessage(),
                                 response.getStatus().getException())
@@ -101,12 +100,12 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
 
         // Stream is done when the last content signaling response message is 
read.
         if (LAST_CONTENT_READ_RESPONSE == response) {
-            final ResultQueue resultQueue = pending.getAndSet(null);
-            if (resultQueue != null) {
+            final ResultSet rs = pendingResultSet.getAndSet(null);
+            if (rs != null) {
                 if (null == 
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
-                    resultQueue.markComplete();
+                    rs.markComplete();
                 } else {
-                    
resultQueue.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
+                    
rs.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
                 }
             }
         }
@@ -119,8 +118,8 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
         // there are that many failures someone would take notice and 
hopefully stop the client.
         logger.error("Could not process the response", cause);
 
-        final ResultQueue pendingQueue = pending.getAndSet(null);
-        if (pendingQueue != null) pendingQueue.markError(cause);
+        final ResultSet resultSet = this.pendingResultSet.getAndSet(null);
+        if (resultSet != null) resultSet.markError(cause);
 
         if (ExceptionHelper.getRootCause(cause) instanceof SSLException) {
             // inbound ssl error can happen with tls 1.3 because client 
certification auth can fail after the handshake completes
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java
similarity index 78%
rename from 
gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java
rename to 
gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java
index 33b9287999..c2054f2761 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultQueueTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/AbstractResultSetTest.java
@@ -18,31 +18,26 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.junit.After;
 import org.junit.Before;
 
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public abstract class AbstractResultQueueTest {
+public abstract class AbstractResultSetTest {
     protected ExecutorService pool;
-    protected ResultQueue resultQueue;
-    protected CompletableFuture<Void> readCompleted;
+    protected ResultSet resultSet;
 
     @Before
     public void setup() {
-        final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new 
LinkedBlockingQueue<>();
-        readCompleted = new CompletableFuture<>();
-        resultQueue = new ResultQueue(resultLinkedBlockingQueue, 
readCompleted);
-        pool =  Executors.newCachedThreadPool();
+        pool = Executors.newCachedThreadPool();
+        resultSet = new ResultSet(pool, 
RequestMessage.build("traversal").create(), null);
     }
 
     @After
@@ -63,15 +58,15 @@ public abstract class AbstractResultQueueTest {
     }
 
     /**
-     * Adds some test items to the {@link ResultQueue}. This method has the 
potential to block if the
+     * Adds some test items to the {@link ResultSet}. This method has the 
potential to block if the
      * {@code itemsToWaitFor > 0}.
      *
      * @param numberOfItemsToAdd the number of items to add in total
-     * @param pauseBetweenItemsInMillis amount of time to wait between 
additions to the {@link ResultQueue}
+     * @param pauseBetweenItemsInMillis amount of time to wait between 
additions to the {@link ResultSet}
      * @param start set to {@code true} to start the thread
-     * @param markDone force mark the queue as complete
-     * @param itemsToWaitFor block until this many items have been added to 
the {@link ResultQueue}
-     * @return the thread that is doing the work of adding to the queue
+     * @param markDone force mark the result set as complete
+     * @param itemsToWaitFor block until this many items have been added to 
the {@link ResultSet}
+     * @return the thread that is doing the work of adding to the result set
      * @throws Exception
      */
     protected Thread addToQueue(final int numberOfItemsToAdd, final long 
pauseBetweenItemsInMillis,
@@ -85,7 +80,7 @@ public abstract class AbstractResultQueueTest {
                     final int currentIndex = ix;
                     pool.submit(() -> {
                         final Result result = new Result("test-" + 
currentIndex);
-                        resultQueue.add(result);
+                        resultSet.add(result);
                         latch.countDown();
                     }).get();
                     TimeUnit.MILLISECONDS.sleep(pauseBetweenItemsInMillis);
@@ -94,13 +89,13 @@ public abstract class AbstractResultQueueTest {
                 }
             }
 
-            if (markDone) resultQueue.markComplete();
+            if (markDone) resultSet.markComplete();
 
-        }, "ResultQueueTest-job-submitter");
+        }, "ResultSetTest-job-submitter");
 
         if (start) t.start();
 
-        // wait for the number of items requested to be added to the queue
+        // wait for the number of items requested to be added to the result set
         latch.await();
 
         return t;
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java
index 0edc12cd0a..71a4fba07a 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java
@@ -19,17 +19,28 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import 
org.apache.tinkerpop.gremlin.driver.interceptor.PayloadSerializingInterceptor;
+import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.tinkerpop.gremlin.driver.Cluster.SERIALIZER_INTERCEPTOR_NAME;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Test Cluster and Cluster.Builder.
@@ -140,4 +151,110 @@ public class ClusterTest {
         final Cluster testCluster = 
Cluster.build().removeInterceptor(SERIALIZER_INTERCEPTOR_NAME).create();
         assertEquals(0, testCluster.getRequestInterceptors().size());
     }
+
+    // ===== Tests for refactored connection pool ownership =====
+
+    /**
+     * borrowConnection() should delegate to the pool's borrowConnection with 
the configured timeout.
+     */
+    @Test
+    public void shouldBorrowConnectionFromPool() throws Exception {
+        final Host host = mock(Host.class);
+        final Connection expectedConn = mock(Connection.class);
+        final ConnectionPool pool = mock(ConnectionPool.class);
+        when(pool.borrowConnection(anyLong(), 
any(TimeUnit.class))).thenReturn(expectedConn);
+
+        final Settings.ConnectionPoolSettings poolSettings = new 
Settings.ConnectionPoolSettings();
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.isClosing()).thenReturn(false);
+        when(cluster.getPoolFor(host)).thenReturn(pool);
+        when(cluster.connectionPoolSettings()).thenReturn(poolSettings);
+        when(cluster.borrowConnection(host)).thenCallRealMethod();
+
+        final Connection conn = cluster.borrowConnection(host);
+        assertSame(expectedConn, conn);
+        verify(pool).borrowConnection(poolSettings.maxWaitForConnection, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * borrowConnection() should throw NoHostAvailableException when no pool 
exists for the host.
+     */
+    @Test(expected = NoHostAvailableException.class)
+    public void shouldThrowNoHostAvailableWhenBorrowingWithNoPool() throws 
Exception {
+        final Host host = mock(Host.class);
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.isClosing()).thenReturn(false);
+        when(cluster.getPoolFor(host)).thenReturn(null);
+        when(cluster.borrowConnection(host)).thenCallRealMethod();
+
+        cluster.borrowConnection(host);
+    }
+
+    /**
+     * borrowConnection() should throw IllegalStateException when the cluster 
is closing.
+     */
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowWhenBorrowingWhileClosing() throws Exception {
+        final Host host = mock(Host.class);
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.isClosing()).thenReturn(true);
+        when(cluster.borrowConnection(host)).thenCallRealMethod();
+
+        cluster.borrowConnection(host);
+    }
+
+    /**
+     * randomHost() should return the first host from the load balancing 
strategy when available.
+     */
+    @Test
+    public void shouldSelectRandomHostFromLoadBalancer() {
+        final Host expectedHost = mock(Host.class);
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        
when(strategy.select(any())).thenReturn(List.of(expectedHost).iterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+        when(cluster.randomHost()).thenCallRealMethod();
+
+        final Host selected = cluster.randomHost();
+        assertSame(expectedHost, selected);
+    }
+
+    /**
+     * randomHost() should fall back to allHosts() when the load balancer 
returns an empty iterator.
+     */
+    @Test
+    public void shouldFallbackToRandomHostWhenLoadBalancerReturnsEmpty() {
+        final Host host = mock(Host.class);
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        when(strategy.select(any())).thenReturn(Collections.emptyIterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+        when(cluster.allHosts()).thenReturn(List.of(host));
+        when(cluster.randomHost()).thenCallRealMethod();
+
+        // with only one host in allHosts(), randomHost() must return it
+        final Host selected = cluster.randomHost();
+        assertSame(host, selected);
+    }
+
+    /**
+     * randomHost() should throw NoHostAvailableException when there are no 
hosts at all.
+     */
+    @Test(expected = NoHostAvailableException.class)
+    public void shouldThrowNoHostAvailableWhenNoHostsExist() {
+        final LoadBalancingStrategy strategy = 
mock(LoadBalancingStrategy.class);
+        when(strategy.select(any())).thenReturn(Collections.emptyIterator());
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.loadBalancingStrategy()).thenReturn(strategy);
+        when(cluster.allHosts()).thenReturn(Collections.emptyList());
+        when(cluster.randomHost()).thenCallRealMethod();
+
+        cluster.randomHost();
+    }
 }
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java
index 5c3bbb3e61..99b43de45a 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java
@@ -61,10 +61,8 @@ public class ConnectionPoolTest {
 
         final Host host = mock(Host.class);
 
-        final Client client = new Client.ClusteredClient(cluster);
-
         // create pool - starts with 1 connection
-        final ConnectionPool connectionPool = new ConnectionPool(host, client, 
Optional.of(2), connectionFactory);
+        final ConnectionPool connectionPool = new ConnectionPool(host, 
cluster, Optional.of(2), connectionFactory);
         // try to borrow a connection.
         final Connection conn0 = connectionPool.borrowConnection(100, 
TimeUnit.MILLISECONDS);
 
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java
similarity index 67%
rename from 
gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
rename to 
gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java
index f540ddc123..4bc0f5b5b8 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetQueuingTest.java
@@ -22,9 +22,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -37,23 +35,25 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
+ * Tests for the internal queuing functionality of {@link ResultSet}.
+ * 
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public class ResultQueueTest extends AbstractResultQueueTest {
+public class ResultSetQueuingTest extends AbstractResultSetTest {
 
     @Test
     public void shouldGetSizeUntilError() throws Exception {
         final Thread t = addToQueue(100, 10, true, false, 1);
         try {
-            assertThat(resultQueue.size(), is(greaterThan(0)));
-            assertThat(readCompleted.isDone(), is(false));
+            assertThat(resultSet.getAvailableItemCount(), is(greaterThan(0)));
+            assertThat(resultSet.allItemsAvailable(), is(false));
 
             final Exception theProblem = new Exception();
-            resultQueue.markError(theProblem);
-            assertThat(readCompleted.isDone(), is(true));
+            resultSet.markError(theProblem);
+            assertThat(resultSet.allItemsAvailable(), is(true));
 
             try {
-                resultQueue.size();
+                resultSet.getAvailableItemCount();
                 fail("Should have thrown an exception");
             } catch (Exception ex) {
                 assertEquals(theProblem, ex.getCause());
@@ -65,24 +65,24 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
 
     @Test
     public void shouldBeEmptyThenNotEmpty() {
-        assertThat(resultQueue.isEmpty(), is(true));
-        resultQueue.add(new Result("test"));
-        assertThat(resultQueue.isEmpty(), is(false));
+        assertThat(resultSet.isEmpty(), is(true));
+        resultSet.add(new Result("test"));
+        assertThat(resultSet.isEmpty(), is(false));
     }
 
     @Test
     public void shouldNotBeEmptyUntilError() throws Exception {
         final Thread t = addToQueue(100, 10, true, false, 1);
         try {
-            assertThat(resultQueue.isEmpty(), is(false));
-            assertThat(readCompleted.isDone(), is(false));
+            assertThat(resultSet.isEmpty(), is(false));
+            assertThat(resultSet.allItemsAvailable(), is(false));
 
             final Exception theProblem = new Exception();
-            resultQueue.markError(theProblem);
-            assertThat(readCompleted.isDone(), is(true));
+            resultSet.markError(theProblem);
+            assertThat(resultSet.allItemsAvailable(), is(true));
 
             try {
-                resultQueue.isEmpty();
+                resultSet.isEmpty();
                 fail("Should have thrown an exception");
             } catch (Exception ex) {
                 assertEquals(theProblem, ex.getCause());
@@ -96,24 +96,24 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
     public void shouldDrainUntilError() throws Exception {
         final Thread t = addToQueue(100, 10, true, false, 1);
         try {
-            assertThat(resultQueue.isEmpty(), is(false));
+            assertThat(resultSet.isEmpty(), is(false));
             final List<Result> drain = new ArrayList<>();
-            resultQueue.drainTo(drain);
+            resultSet.drainTo(drain);
             assertThat(drain.size(), is(greaterThan(0)));
-            assertThat(readCompleted.isDone(), is(false));
+            assertThat(resultSet.allItemsAvailable(), is(false));
 
-            // make sure some more items get added to the queue before assert
+            // make sure some more items get added to the result set before 
assert
             TimeUnit.MILLISECONDS.sleep(100);
 
-            assertThat(resultQueue.isEmpty(), is(false));
-            assertThat(readCompleted.isDone(), is(false));
+            assertThat(resultSet.isEmpty(), is(false));
+            assertThat(resultSet.allItemsAvailable(), is(false));
 
             final Exception theProblem = new Exception();
-            resultQueue.markError(theProblem);
-            assertThat(readCompleted.isDone(), is(true));
+            resultSet.markError(theProblem);
+            assertThat(resultSet.allItemsAvailable(), is(true));
 
             try {
-                resultQueue.drainTo(new ArrayList<>());
+                resultSet.drainTo(new ArrayList<>());
                 fail("Should have thrown an exception");
             } catch (Exception ex) {
                 assertEquals(theProblem, ex.getCause());
@@ -125,13 +125,13 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
 
     @Test
     public void shouldAwaitEverythingAndFlushOnMarkCompleted() throws 
Exception {
-        final CompletableFuture<List<Result>> future = resultQueue.await(4);
-        resultQueue.add(new Result("test1"));
-        resultQueue.add(new Result("test2"));
-        resultQueue.add(new Result("test3"));
+        final CompletableFuture<List<Result>> future = resultSet.some(4);
+        resultSet.add(new Result("test1"));
+        resultSet.add(new Result("test2"));
+        resultSet.add(new Result("test3"));
 
         assertThat(future.isDone(), is(false));
-        resultQueue.markComplete();
+        resultSet.markComplete();
         assertThat(future.isDone(), is(true));
 
         final List<Result> results = future.get();
@@ -140,18 +140,18 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
         assertEquals("test3", results.get(2).getString());
         assertEquals(3, results.size());
 
-        assertThat(resultQueue.isEmpty(), is(true));
+        assertThat(resultSet.isEmpty(), is(true));
     }
 
     @Test
     public void shouldAwaitFailTheFutureOnMarkError() {
-        final CompletableFuture<List<Result>> future = resultQueue.await(4);
-        resultQueue.add(new Result("test1"));
-        resultQueue.add(new Result("test2"));
-        resultQueue.add(new Result("test3"));
+        final CompletableFuture<List<Result>> future = resultSet.some(4);
+        resultSet.add(new Result("test1"));
+        resultSet.add(new Result("test2"));
+        resultSet.add(new Result("test3"));
 
         assertThat(future.isDone(), is(false));
-        resultQueue.markError(new Exception("no worky"));
+        resultSet.markError(new Exception("no worky"));
         assertThat(future.isDone(), is(true));
 
         try {
@@ -164,14 +164,14 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
 
     @Test
     public void shouldAwaitToExpectedValueAndDrainOnAdd() throws Exception {
-        final CompletableFuture<List<Result>> future = resultQueue.await(3);
-        resultQueue.add(new Result("test1"));
-        resultQueue.add(new Result("test2"));
+        final CompletableFuture<List<Result>> future = resultSet.some(3);
+        resultSet.add(new Result("test1"));
+        resultSet.add(new Result("test2"));
 
         // shouldn't complete until the third item is in play
         assertThat(future.isDone(), is(false));
 
-        resultQueue.add(new Result("test3"));
+        resultSet.add(new Result("test3"));
 
         final List<Result> results = future.get();
         assertEquals("test1", results.get(0).getString());
@@ -179,21 +179,21 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
         assertEquals("test3", results.get(2).getString());
         assertEquals(3, results.size());
 
-        assertThat(resultQueue.isEmpty(), is(true));
+        assertThat(resultSet.isEmpty(), is(true));
     }
 
     @Test
     public void shouldAwaitMultipleToExpectedValueAndDrainOnAdd() throws 
Exception {
-        final CompletableFuture<List<Result>> future1 = resultQueue.await(3);
-        final CompletableFuture<List<Result>> future2 = resultQueue.await(1);
-        resultQueue.add(new Result("test1"));
-        resultQueue.add(new Result("test2"));
+        final CompletableFuture<List<Result>> future1 = resultSet.some(3);
+        final CompletableFuture<List<Result>> future2 = resultSet.some(1);
+        resultSet.add(new Result("test1"));
+        resultSet.add(new Result("test2"));
 
         // shouldn't complete the first future until the third item is in play
         assertThat(future1.isDone(), is(false));
         assertThat(future2.isDone(), is(false));
 
-        resultQueue.add(new Result("test3"));
+        resultSet.add(new Result("test3"));
 
         final List<Result> results1 = future1.get();
         assertEquals("test1", results1.get(0).getString());
@@ -203,7 +203,7 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
         assertThat(future1.isDone(), is(true));
         assertThat(future2.isDone(), is(false));
 
-        resultQueue.add(new Result("test4"));
+        resultSet.add(new Result("test4"));
         assertThat(future1.isDone(), is(true));
         assertThat(future2.isDone(), is(true));
 
@@ -211,16 +211,16 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
         assertEquals("test4", results2.get(0).getString());
         assertEquals(1, results2.size());
 
-        assertThat(resultQueue.isEmpty(), is(true));
+        assertThat(resultSet.isEmpty(), is(true));
     }
 
     @Test
     public void shouldAwaitToExpectedValueAndDrainOnAwait() throws Exception {
-        resultQueue.add(new Result("test1"));
-        resultQueue.add(new Result("test2"));
-        resultQueue.add(new Result("test3"));
+        resultSet.add(new Result("test1"));
+        resultSet.add(new Result("test2"));
+        resultSet.add(new Result("test3"));
 
-        final CompletableFuture<List<Result>> future = resultQueue.await(3);
+        final CompletableFuture<List<Result>> future = resultSet.some(3);
         assertThat(future.isDone(), is(true));
 
         final List<Result> results = future.get();
@@ -229,19 +229,19 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
         assertEquals("test3", results.get(2).getString());
         assertEquals(3, results.size());
 
-        assertThat(resultQueue.isEmpty(), is(true));
+        assertThat(resultSet.isEmpty(), is(true));
     }
 
     @Test
     public void shouldAwaitToReadCompletedAndDrainOnAwait() throws Exception {
-        resultQueue.add(new Result("test1"));
-        resultQueue.add(new Result("test2"));
-        resultQueue.add(new Result("test3"));
+        resultSet.add(new Result("test1"));
+        resultSet.add(new Result("test2"));
+        resultSet.add(new Result("test3"));
 
-        resultQueue.markComplete();
+        resultSet.markComplete();
 
         // you might want 30 but there are only three
-        final CompletableFuture<List<Result>> future = resultQueue.await(30);
+        final CompletableFuture<List<Result>> future = resultSet.some(30);
         assertThat(future.isDone(), is(true));
 
         final List<Result> results = future.get();
@@ -250,7 +250,7 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
         assertEquals("test3", results.get(2).getString());
         assertEquals(3, results.size());
 
-        assertThat(resultQueue.isEmpty(), is(true));
+        assertThat(resultSet.isEmpty(), is(true));
     }
 
     @Test
@@ -262,17 +262,17 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
             final AtomicInteger count3 = new AtomicInteger(0);
             final CountDownLatch latch = new CountDownLatch(3);
 
-            resultQueue.await(500).thenAcceptAsync(r -> {
+            resultSet.some(500).thenAcceptAsync(r -> {
                 count1.set(r.size());
                 latch.countDown();
             });
 
-            resultQueue.await(150).thenAcceptAsync(r -> {
+            resultSet.some(150).thenAcceptAsync(r -> {
                 count2.set(r.size());
                 latch.countDown();
             });
 
-            resultQueue.await(350).thenAcceptAsync(r -> {
+            resultSet.some(350).thenAcceptAsync(r -> {
                 count3.set(r.size());
                 latch.countDown();
             });
@@ -283,7 +283,7 @@ public class ResultQueueTest extends 
AbstractResultQueueTest {
             assertEquals(150, count2.get());
             assertEquals(350, count3.get());
 
-            assertThat(resultQueue.isEmpty(), is(true));
+            assertThat(resultSet.isEmpty(), is(true));
         } finally {
             t.interrupt();
         }
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index 855d9466a9..4bf3b8c6b7 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Iterator;
@@ -39,20 +37,13 @@ import static org.junit.Assert.fail;
 /**
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public class ResultSetTest extends AbstractResultQueueTest {
-
-    private ResultSet resultSet;
-
-    @Before
-    public void setupThis() {
-        resultSet = new ResultSet(resultQueue, pool, readCompleted, 
RequestMessage.build("traversal").create(), null);
-    }
+public class ResultSetTest extends AbstractResultSetTest {
 
     @Test
     public void shouldHaveAllItemsAvailableAsynchronouslyOnReadComplete() 
throws InterruptedException {
         final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync();
         assertThat(all.isDone(), is(false));
-        readCompleted.complete(null);
+        resultSet.markComplete();
         // flush all tasks in pool
         pool.awaitTermination(2, TimeUnit.SECONDS);
         assertThat(all.isDone(), is(true));
@@ -62,7 +53,7 @@ public class ResultSetTest extends AbstractResultQueueTest {
     public void 
shouldHaveAllItemsAvailableAsynchronouslyOnReadCompleteExceptionally() throws 
InterruptedException {
         final CompletableFuture<Void> all = resultSet.allItemsAvailableAsync();
         assertThat(all.isDone(), is(false));
-        readCompleted.completeExceptionally(new RuntimeException());
+        resultSet.markError(new RuntimeException());
         // flush all tasks in pool
         pool.awaitTermination(2, TimeUnit.SECONDS);
         assertThat(all.isDone(), is(true));
@@ -72,7 +63,7 @@ public class ResultSetTest extends AbstractResultQueueTest {
     @Test
     public void shouldHaveAllItemsAvailableOnReadComplete() throws 
InterruptedException {
         assertThat(resultSet.allItemsAvailable(), is(false));
-        readCompleted.complete(null);
+        resultSet.markComplete();
         // flush all tasks in pool
         pool.awaitTermination(2, TimeUnit.SECONDS);
         assertThat(resultSet.allItemsAvailable(), is(true));
@@ -85,7 +76,7 @@ public class ResultSetTest extends AbstractResultQueueTest {
 
         final AtomicBoolean atLeastOnce = new AtomicBoolean(false);
         addToQueue(1000, 1, true, true);
-        while (!readCompleted.isDone()) {
+        while (!resultSet.allItemsAvailable()) {
             atLeastOnce.set(true);
             if (!atLeastOnce.get())
                 assertThat(all.isDone(), is(false));
@@ -104,7 +95,7 @@ public class ResultSetTest extends AbstractResultQueueTest {
 
         final AtomicBoolean atLeastOnce = new AtomicBoolean(false);
         addToQueue(1000, 1, true, true);
-        while (!readCompleted.isDone()) {
+        while (!resultSet.allItemsAvailable()) {
             atLeastOnce.set(true);
             if (!atLeastOnce.get())
                 assertThat(resultSet.allItemsAvailable(), is(false));
@@ -117,12 +108,12 @@ public class ResultSetTest extends 
AbstractResultQueueTest {
     @Test
     public void shouldAwaitEverythingAndFlushOnMarkCompleted() throws 
Exception {
         final CompletableFuture<List<Result>> future = resultSet.some(4);
-        resultQueue.add(new Result("test1"));
-        resultQueue.add(new Result("test2"));
-        resultQueue.add(new Result("test3"));
+        resultSet.add(new Result("test1"));
+        resultSet.add(new Result("test2"));
+        resultSet.add(new Result("test3"));
 
         assertThat(future.isDone(), is(false));
-        resultQueue.markComplete();
+        resultSet.markComplete();
         assertThat(future.isDone(), is(true));
 
         final List<Result> results = future.get();
@@ -138,12 +129,12 @@ public class ResultSetTest extends 
AbstractResultQueueTest {
     @Test
     public void shouldGetAllOnlyOnComplete() throws Exception {
         final CompletableFuture<List<Result>> future = resultSet.all();
-        resultQueue.add(new Result("test1"));
-        resultQueue.add(new Result("test2"));
-        resultQueue.add(new Result("test3"));
+        resultSet.add(new Result("test1"));
+        resultSet.add(new Result("test2"));
+        resultSet.add(new Result("test3"));
 
         assertThat(future.isDone(), is(false));
-        resultQueue.markComplete();
+        resultSet.markComplete();
 
         final List<Result> results = future.get();
         assertEquals("test1", results.get(0).getString());
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
index ba48bf6d56..a307028ee1 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
@@ -22,7 +22,6 @@ import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.Logger;
 import io.netty.handler.codec.TooLongFrameException;
 import nl.altindag.log.LogCaptor;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import 
org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest;
@@ -34,7 +33,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.Request;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
@@ -129,9 +127,8 @@ public class ClientConnectionIntegrateTest extends 
AbstractGremlinServerIntegrat
             assertEquals(1, cluster.availableHosts().size());
 
             // Assert that there is no connection leak and all connections 
have been closed
-            assertEquals(0, client.hostConnectionPools.values().stream()
-                                                             .findFirst().get()
-                                                             
.numConnectionsWaitingToCleanup());
+            assertEquals(0, 
cluster.getPoolFor(cluster.allHosts().iterator().next())
+                                   .numConnectionsWaitingToCleanup());
         } finally {
             cluster.close();
         }
@@ -143,51 +140,6 @@ public class ClientConnectionIntegrateTest extends 
AbstractGremlinServerIntegrat
 
     }
 
-    @Test
-    public void shouldBalanceConcurrentRequestsAcrossConnections() throws 
InterruptedException {
-        final int connPoolSize = 16;
-        final Cluster cluster = TestClientFactory.build()
-                .maxConnectionPoolSize(connPoolSize)
-                .create();
-        final Client.ClusteredClient client = cluster.connect();
-        client.init();
-        final ExecutorService executorServiceForTesting = cluster.executor();
-
-        try {
-            final RequestMessage.Builder request = 
client.buildMessage(RequestMessage.build("Thread.sleep(5000)"));
-            final Callable<Connection> sendQueryCallable = () -> 
client.chooseConnection(request.create());
-            final List<Callable<Connection>> listOfTasks = new ArrayList<>();
-            for (int i = 0; i < connPoolSize; i++) {
-                listOfTasks.add(sendQueryCallable);
-            }
-
-            HashMap<String, Integer> channelsSize = new HashMap<>();
-
-            final List<Future<Connection>> executorSubmitFutures = 
executorServiceForTesting.invokeAll(listOfTasks);
-            executorSubmitFutures.parallelStream().map(fut -> {
-                try {
-                    return fut.get();
-                } catch (InterruptedException | ExecutionException e) {
-                    fail(e.getMessage());
-                    return null;
-                }
-            }).forEach(conn -> {
-                String id = conn.getChannelId();
-                channelsSize.put(id, channelsSize.getOrDefault(id, 0) + 1);
-            });
-
-            assertNotEquals(channelsSize.entrySet().size(), 0);
-            channelsSize.entrySet().forEach(entry -> {
-                assertEquals(1, (entry.getValue()).intValue());
-            });
-
-        } finally {
-            executorServiceForTesting.shutdown();
-            client.close();
-            cluster.close();
-        }
-    }
-
     @Test
     public void shouldCreateNewHttpConnectionPerRequestAsNeeded() throws 
InterruptedException {
         final int operations = 6;
@@ -199,8 +151,7 @@ public class ClientConnectionIntegrateTest extends 
AbstractGremlinServerIntegrat
         final ExecutorService executorServiceForTesting = cluster.executor();
 
         try {
-            final RequestMessage.Builder request = 
client.buildMessage(RequestMessage.build("Thread.sleep(5000)"));
-            final Callable<Connection> sendQueryCallable = () -> 
client.chooseConnection(request.create());
+            final Callable<Connection> sendQueryCallable = () -> 
cluster.borrowConnection(cluster.allHosts().iterator().next());
             final List<Callable<Connection>> listOfTasks = new ArrayList<>();
             for (int i = 0; i < operations; i++) {
                 listOfTasks.add(sendQueryCallable);
@@ -250,7 +201,7 @@ public class ClientConnectionIntegrateTest extends 
AbstractGremlinServerIntegrat
 
         // every 10 connections let's have some problems
         final JitteryConnectionFactory connectionFactory = new 
JitteryConnectionFactory(3);
-        client.hostConnectionPools.forEach((h, pool) -> pool.connectionFactory 
= connectionFactory);
+        cluster.allHosts().forEach(h -> 
cluster.getPoolFor(h).connectionFactory = connectionFactory);
 
         // get an initial connection which marks the host as available
         assertEquals(2, 
client.submit("g.inject(2)").all().join().get(0).getInt());
@@ -313,9 +264,9 @@ public class ClientConnectionIntegrateTest extends 
AbstractGremlinServerIntegrat
 
             // simulate downtime where there is no traffic so that all 
connections become idle
             TimeUnit.MILLISECONDS.sleep(idleMillis * 3);
-            assertEquals(1, client.hostConnectionPools.size());
+            assertEquals(1, cluster.allHosts().size());
             // all connections should have been closed due to idle timeout
-            
assertTrue(client.hostConnectionPools.values().iterator().next().getPoolInfo().contains("no
 connections in pool"));
+            
assertTrue(cluster.getPoolFor(cluster.allHosts().iterator().next()).getPoolInfo().contains("no
 connections in pool"));
 
             // create or reuse some more connections
             chooseConnections(4, client, executorServiceForTesting);
@@ -348,11 +299,11 @@ public class ClientConnectionIntegrateTest extends 
AbstractGremlinServerIntegrat
     }
 
     private static void chooseConnections(int operations, 
Client.ClusteredClient client, ExecutorService executorServiceForTesting) 
throws InterruptedException {
-        final RequestMessage.Builder request = 
client.buildMessage(RequestMessage.build("g.inject(1)"));
-        final Callable<Connection> sendQueryCallable = () -> 
client.chooseConnection(request.create());
+        final Cluster cluster = client.getCluster();
+        final Callable<Connection> borrowConnectionCallable = () -> 
cluster.borrowConnection(cluster.allHosts().iterator().next());
         final List<Callable<Connection>> listOfTasks = new ArrayList<>();
         for (int i = 0; i < operations; i++) {
-            listOfTasks.add(sendQueryCallable);
+            listOfTasks.add(borrowConnectionCallable);
         }
 
         final List<Future<Connection>> executorSubmitFutures = 
executorServiceForTesting.invokeAll(listOfTasks);
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 7b051812c7..89ad207135 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -1094,13 +1094,11 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
     public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
         final Cluster cluster = TestClientFactory.open();
         final Client sessionlessOne = cluster.connect();
-        final Client session = cluster.connect("session");
         final Client sessionlessTwo = cluster.connect();
         final Client sessionlessThree = cluster.connect();
         final Client sessionlessFour = cluster.connect();
 
         assertEquals(2, 
sessionlessOne.submit("g.inject(2)").all().get().get(0).getInt());
-        assertEquals(2, 
session.submit("g.inject(2)").all().get().get(0).getInt());
         assertEquals(2, 
sessionlessTwo.submit("g.inject(2)").all().get().get(0).getInt());
         assertEquals(2, 
sessionlessThree.submit("g.inject(2)").all().get().get(0).getInt());
         // dont' send anything on the 4th client
@@ -1119,7 +1117,6 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         }
 
         try {
-            session.submit("g.inject(2)").all().get();
             fail("Should have tossed an exception because cluster was closed");
         } catch (Exception ex) {
             final Throwable root = ExceptionHelper.getRootCause(ex);
@@ -1156,7 +1153,6 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
 
         // allow call to close() even though closed through cluster
         sessionlessOne.close();
-        session.close();
         sessionlessTwo.close();
 
         cluster.close();
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
index 17563cdd8b..f6452abd09 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
@@ -209,22 +209,6 @@ public class HttpDriverIntegrateTest extends 
AbstractGremlinServerIntegrationTes
         }
     }
 
-    @Test
-    public void shouldFailToUseSession() {
-        final Cluster cluster = TestClientFactory.build().create();
-        try {
-            final Client client = cluster.connect("shouldFailToUseSession");
-            client.submit("g.inject(2)").all().get();
-            fail("Can't use session with HTTP");
-        } catch (Exception ex) {
-            final Throwable t = ExceptionUtils.getRootCause(ex);
-            // assertEquals("Cannot use sessions or tx() with 
HttpChannelizer", t.getMessage());
-            assertEquals("not implemented", t.getMessage());
-        } finally {
-            cluster.close();
-        }
-    }
-
     @Test
     public void shouldDeserializeErrorWithGraphBinary() {
         final Cluster cluster = TestClientFactory.build().create();

Reply via email to