This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4db7f4c502ba6428bf4f3f7d52b00a8cbada29fa Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Sep 9 09:40:20 2021 +0200 [FLINK-24213][qs] Use single lock in ServerConnection --- .../flink/queryablestate/network/ServerConnection.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java index 861dcc3..c24e4b3 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java @@ -50,7 +50,7 @@ import java.util.function.Function; final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> { private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class); - private final Object connectionLock = new Object(); + private final Object connectionLock; @GuardedBy("connectionLock") private InternalConnection<REQ, RESP> internalConnection; @@ -60,7 +60,8 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> private final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); - private ServerConnection(InternalConnection<REQ, RESP> internalConnection) { + private ServerConnection(Object lock, InternalConnection<REQ, RESP> internalConnection) { + this.connectionLock = lock; this.internalConnection = internalConnection; forwardCloseFuture(); } @@ -119,11 +120,14 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> final String clientName, final MessageSerializer<REQ, RESP> serializer, final KvStateRequestStats stats) { + final Object lock = new Object(); + return new ServerConnection<>( + lock, new PendingConnection<>( channel -> new EstablishedConnection<>( - clientName, serializer, channel, stats))); + lock, clientName, serializer, channel, stats))); } interface InternalConnection<REQ, RESP> { @@ -288,7 +292,7 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> private static class EstablishedConnection<REQ extends MessageBody, RESP extends MessageBody> implements ClientHandlerCallback<RESP>, InternalConnection<REQ, RESP> { - private final Object lock = new Object(); + private final Object lock; /** The actual TCP channel. */ private final Channel channel; @@ -315,11 +319,13 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> * @param channel The actual TCP channel */ EstablishedConnection( + final Object lock, final String clientName, final MessageSerializer<REQ, RESP> serializer, final Channel channel, final KvStateRequestStats stats) { + this.lock = lock; this.channel = Preconditions.checkNotNull(channel); // Add the client handler with the callback