This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fa2ab610c1c20ff828db0fc928b6328c2f440e9d Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Sep 9 09:34:26 2021 +0200 [FLINK-24213][qs] Introduce factory for established connection --- .../queryablestate/network/ServerConnection.java | 24 +++++++++------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java index 5c2319e..861dcc3 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java @@ -39,6 +39,7 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; /** * Connection class used by the {@link Client}. @@ -118,7 +119,11 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> final String clientName, final MessageSerializer<REQ, RESP> serializer, final KvStateRequestStats stats) { - return new ServerConnection<>(new PendingConnection<>(clientName, serializer, stats)); + return new ServerConnection<>( + new PendingConnection<>( + channel -> + new EstablishedConnection<>( + clientName, serializer, channel, stats))); } interface InternalConnection<REQ, RESP> { @@ -137,12 +142,7 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> private static final class PendingConnection<REQ extends MessageBody, RESP extends MessageBody> implements InternalConnection<REQ, RESP> { - private final String clientName; - - private final MessageSerializer<REQ, RESP> serializer; - - private final KvStateRequestStats stats; - + private final Function<Channel, EstablishedConnection<REQ, RESP>> connectionFactory; private final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); /** Queue of requests while connecting. */ @@ -156,12 +156,8 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> /** Creates a pending connection to the given server. */ private PendingConnection( - final String clientName, - final MessageSerializer<REQ, RESP> serializer, - final KvStateRequestStats stats) { - this.clientName = clientName; - this.serializer = serializer; - this.stats = stats; + Function<Channel, EstablishedConnection<REQ, RESP>> connectionFactory) { + this.connectionFactory = connectionFactory; } /** @@ -221,7 +217,7 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> return this; } else { final EstablishedConnection<REQ, RESP> establishedConnection = - new EstablishedConnection<>(clientName, serializer, channel, stats); + connectionFactory.apply(channel); while (!queuedRequests.isEmpty()) { final PendingConnection.PendingRequest<REQ, RESP> pending =