This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2185c0f9bef64b103a4e1ae2c33619d83d54921
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Sep 9 09:34:26 2021 +0200

    [FLINK-24213][qs] Introduce factory for established connection
---
 .../queryablestate/network/ServerConnection.java   | 24 +++++++++-------------
 1 file changed, 10 insertions(+), 14 deletions(-)

diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
index 2fdad01..ebc8454 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
@@ -39,6 +39,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.ArrayDeque;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 /**
  * Connection class used by the {@link Client}.
@@ -118,7 +119,11 @@ final class ServerConnection<REQ extends MessageBody, RESP 
extends MessageBody>
                     final String clientName,
                     final MessageSerializer<REQ, RESP> serializer,
                     final KvStateRequestStats stats) {
-        return new ServerConnection<>(new PendingConnection<>(clientName, 
serializer, stats));
+        return new ServerConnection<>(
+                new PendingConnection<>(
+                        channel ->
+                                new EstablishedConnection<>(
+                                        clientName, serializer, channel, 
stats)));
     }
 
     interface InternalConnection<REQ, RESP> {
@@ -137,12 +142,7 @@ final class ServerConnection<REQ extends MessageBody, RESP 
extends MessageBody>
     private static final class PendingConnection<REQ extends MessageBody, RESP 
extends MessageBody>
             implements InternalConnection<REQ, RESP> {
 
-        private final String clientName;
-
-        private final MessageSerializer<REQ, RESP> serializer;
-
-        private final KvStateRequestStats stats;
-
+        private final Function<Channel, EstablishedConnection<REQ, RESP>> 
connectionFactory;
         private final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
 
         /** Queue of requests while connecting. */
@@ -156,12 +156,8 @@ final class ServerConnection<REQ extends MessageBody, RESP 
extends MessageBody>
 
         /** Creates a pending connection to the given server. */
         private PendingConnection(
-                final String clientName,
-                final MessageSerializer<REQ, RESP> serializer,
-                final KvStateRequestStats stats) {
-            this.clientName = clientName;
-            this.serializer = serializer;
-            this.stats = stats;
+                Function<Channel, EstablishedConnection<REQ, RESP>> 
connectionFactory) {
+            this.connectionFactory = connectionFactory;
         }
 
         /**
@@ -221,7 +217,7 @@ final class ServerConnection<REQ extends MessageBody, RESP 
extends MessageBody>
                 return this;
             } else {
                 final EstablishedConnection<REQ, RESP> establishedConnection =
-                        new EstablishedConnection<>(clientName, serializer, 
channel, stats);
+                        connectionFactory.apply(channel);
 
                 while (!queuedRequests.isEmpty()) {
                     final PendingConnection.PendingRequest<REQ, RESP> pending =

Reply via email to