[ 
https://issues.apache.org/jira/browse/TINKERPOP-2486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17421588#comment-17421588
 ] 

ASF GitHub Bot commented on TINKERPOP-2486:
-------------------------------------------

simonz-bq commented on a change in pull request #1465:
URL: https://github.com/apache/tinkerpop/pull/1465#discussion_r717844478



##########
File path: 
gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
##########
@@ -107,4 +115,103 @@ public void 
shouldCloseConnectionDeadDueToUnRecoverableError() throws Exception
         
assertThat(recordingAppender.logContainsAny("^(?!.*(isDead=false)).*isDead=true.*destroyed
 successfully.$"), is(true));
 
     }
+
+    @Test
+    public void shouldBalanceConcurrentRequestsAcrossConnections() throws 
InterruptedException {
+        final int connPoolSize = 16;
+        final Cluster cluster = TestClientFactory.build()
+                .minConnectionPoolSize(connPoolSize)
+                .maxConnectionPoolSize(connPoolSize)
+                .create();
+        final Client.ClusteredClient client = cluster.connect();
+        client.init();
+        final ExecutorService executorServiceForTesting = cluster.executor();
+
+        try {
+            final RequestMessage.Builder request = 
client.buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
+                    .add(Tokens.ARGS_GREMLIN, "Thread.sleep(5000)");
+            final Callable<Connection> sendQueryCallable = () -> 
client.chooseConnection(request.create());
+            final List<Callable<Connection>> listOfTasks = new ArrayList<>();
+            for (int i = 0; i < connPoolSize; i++) {
+                listOfTasks.add(sendQueryCallable);
+            }
+
+            HashMap<String, Integer> channelsSize = new HashMap<>();
+
+            final List<Future<Connection>> executorSubmitFutures = 
executorServiceForTesting.invokeAll(listOfTasks);
+            executorSubmitFutures.parallelStream().map(fut -> {
+                try {
+                    return fut.get();
+                } catch (InterruptedException | ExecutionException e) {
+                    fail(e.getMessage());
+                    return null;
+                }
+            }).forEach(conn -> {
+                String id = conn.getChannelId();
+                channelsSize.put(id, channelsSize.getOrDefault(id, 0) + 1);
+            });
+
+            assertNotEquals(channelsSize.entrySet().size(), 0);
+            channelsSize.entrySet().forEach(entry -> {
+                assertEquals(1, (entry.getValue()).intValue());
+            });
+
+        } finally {
+            executorServiceForTesting.shutdown();
+            client.close();
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void overLimitOperationsShouldDelegateToSingleNewConnection() 
throws InterruptedException {

Review comment:
       I think, for now anyways, the two tests are rather self contained and 
already complex. Keeping them as is makes them easier to understand, and I 
don't think there will be a reduction in complexity or even lines of code to 
refactor it right now.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Client does not load balance requests across available connections
> ------------------------------------------------------------------
>
>                 Key: TINKERPOP-2486
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-2486
>             Project: TinkerPop
>          Issue Type: Bug
>          Components: driver
>    Affects Versions: 3.4.8, 3.4.9
>            Reporter: Divij Vaidya
>            Priority: Major
>
> The client does not load balance requests across connections in a threadpool 
> which cause a request failing with timeout even when a connection is 
> available. To verify this, the following test fails:
> {code:java}
> @Test
> public void shouldBalanceConcurrentRequestsAcrossConnections() throws 
> InterruptedException {
>     final int connPoolSize = 16;
>     final Cluster cluster = TestClientFactory.build()
>             .minConnectionPoolSize(connPoolSize)
>             .maxConnectionPoolSize(connPoolSize)
>             .create();
>     final Client.ClusteredClient client = cluster.connect();
>     client.init();
>     try {
>         final RequestMessage.Builder request = 
> client.buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
>                 .add(Tokens.ARGS_GREMLIN, "Thread.sleep(5000)");
>         final Callable<Connection> sendQueryCallable = () -> 
> client.chooseConnection(request.create());
>         final List<Callable<Connection>> listOfTasks = new ArrayList<>();
>         for (int i=0; i<connPoolSize; i++) {
>             listOfTasks.add(sendQueryCallable);
>         }
>         Set<String> channels = new HashSet<>();
>         final List<Future<Connection>> executorSubmitFutures = 
> executorServiceForTesting.invokeAll(listOfTasks);
>         executorSubmitFutures.parallelStream().map(fut -> {
>             try {
>                 return fut.get();
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             } catch (ExecutionException e) {
>                 e.printStackTrace();
>             }
>             return null;
>         }).forEach(conn -> channels.add(conn.getChannelId()));
>         
>         System.out.println(channels.size());
>     } finally {
>         cluster.close();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to