1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082043383


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -107,49 +119,78 @@ void run() {
             try {
                 closeAll(
                         this::cleanupRequests,
-                        () ->
-                                dispatcher.fail(
-                                        thrown == null ? new 
CancellationException() : thrown));
+                        () -> {
+                            Throwable cause;
+                            synchronized (lock) {
+                                cause = thrown == null ? new 
CancellationException() : thrown;
+                            }
+                            dispatcher.fail(cause);
+                        });
             } catch (Exception e) {
-                //noinspection NonAtomicOperationOnVolatileField
-                thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                synchronized (lock) {
+                    //noinspection NonAtomicOperationOnVolatileField
+                    thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                }
             }
             FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
         }
         LOG.debug("loop terminated");
     }
 
     private void loop() throws Exception {
-        while (!wasClosed) {
+        while (true) {
             try {
-                ChannelStateWriteRequest request = deque.take();
-                // The executor will end the registration, when the start 
request comes.
-                // Because the checkpoint can be started after all tasks are 
initiated.
-                if (request instanceof CheckpointStartRequest && 
isRegistering()) {
-                    checkState(
-                            isRegistering.compareAndSet(true, false),
-                            "Transition isRegistering failed.");
+                ChannelStateWriteRequest request;
+                boolean completeRegister = false;
+                synchronized (lock) {
+                    if (wasClosed) {
+                        return;
+                    }
+                    request = waitAndTakeUnsafe();
+                    // The executor will end the registration, when the start 
request comes.
+                    // Because the checkpoint can be started after all tasks 
are initiated.
+                    if (request instanceof CheckpointStartRequest) {
+                        completeRegister = completeRegister();
+                    }
+                }
+                if (completeRegister) {
                     onRegistered.accept(this);
                 }
                 dispatcher.dispatch(request);
             } catch (InterruptedException e) {
-                if (!wasClosed) {
-                    LOG.debug(
-                            "Channel state executor is interrupted while 
waiting for a request (continue waiting)",
-                            e);
-                } else {
-                    Thread.currentThread().interrupt();
+                synchronized (lock) {
+                    if (!wasClosed) {
+                        LOG.debug(
+                                "Channel state executor is interrupted while 
waiting for a request (continue waiting)",
+                                e);
+                    } else {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
         }
     }
 
+    private ChannelStateWriteRequest waitAndTakeUnsafe() throws 
InterruptedException {
+        ChannelStateWriteRequest request;
+        while (true) {

Review Comment:
   Thanks for your review, updated.
   
   I didn't squash commits, and add a new fixup commit, it should be easy to 
review. And I can squash them and rebase master after you think it's ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to