This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4db7f4c502ba6428bf4f3f7d52b00a8cbada29fa
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Sep 9 09:40:20 2021 +0200

    [FLINK-24213][qs] Use single lock in ServerConnection
---
 .../flink/queryablestate/network/ServerConnection.java     | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
index 861dcc3..c24e4b3 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
@@ -50,7 +50,7 @@ import java.util.function.Function;
 final class ServerConnection<REQ extends MessageBody, RESP extends 
MessageBody> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ServerConnection.class);
 
-    private final Object connectionLock = new Object();
+    private final Object connectionLock;
 
     @GuardedBy("connectionLock")
     private InternalConnection<REQ, RESP> internalConnection;
@@ -60,7 +60,8 @@ final class ServerConnection<REQ extends MessageBody, RESP 
extends MessageBody>
 
     private final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
 
-    private ServerConnection(InternalConnection<REQ, RESP> internalConnection) 
{
+    private ServerConnection(Object lock, InternalConnection<REQ, RESP> 
internalConnection) {
+        this.connectionLock = lock;
         this.internalConnection = internalConnection;
         forwardCloseFuture();
     }
@@ -119,11 +120,14 @@ final class ServerConnection<REQ extends MessageBody, 
RESP extends MessageBody>
                     final String clientName,
                     final MessageSerializer<REQ, RESP> serializer,
                     final KvStateRequestStats stats) {
+        final Object lock = new Object();
+
         return new ServerConnection<>(
+                lock,
                 new PendingConnection<>(
                         channel ->
                                 new EstablishedConnection<>(
-                                        clientName, serializer, channel, 
stats)));
+                                        lock, clientName, serializer, channel, 
stats)));
     }
 
     interface InternalConnection<REQ, RESP> {
@@ -288,7 +292,7 @@ final class ServerConnection<REQ extends MessageBody, RESP 
extends MessageBody>
     private static class EstablishedConnection<REQ extends MessageBody, RESP 
extends MessageBody>
             implements ClientHandlerCallback<RESP>, InternalConnection<REQ, 
RESP> {
 
-        private final Object lock = new Object();
+        private final Object lock;
 
         /** The actual TCP channel. */
         private final Channel channel;
@@ -315,11 +319,13 @@ final class ServerConnection<REQ extends MessageBody, 
RESP extends MessageBody>
          * @param channel The actual TCP channel
          */
         EstablishedConnection(
+                final Object lock,
                 final String clientName,
                 final MessageSerializer<REQ, RESP> serializer,
                 final Channel channel,
                 final KvStateRequestStats stats) {
 
+            this.lock = lock;
             this.channel = Preconditions.checkNotNull(channel);
 
             // Add the client handler with the callback

Reply via email to