rkhachatryan commented on a change in pull request #11507: [FLINK-16587] Add 
basic CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#discussion_r408111888
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##########
 @@ -170,9 +173,14 @@
 
        private final CompletableFuture<Void> closeFuture;
 
+       @Nullable
+       protected BufferReceivedListener bufferReceivedListener;
 
 Review comment:
   Just adding a guard is not enough to protect against visibility issues.
   
   To clarify, I don't agree with @pnowojski:
   > I think currently there are no concurrency issues, as this field is being 
set very shortly after the construction, before passing it to different threads?
   
   IMO, it doesn't matter whether it is set during construction or not if there 
are no memory barriers.
   
   And since `final` is not an option we are only left with `volatile`. Which 
as I said we can be cached. 
   
   By caching I mean:
   1. mark the field `volatile`
   1. "constructing thread" writes the value before any following reads
   1. "reading thread" uses a non-volatile field which is updated once per 
thread in the beginning:
   ```
   if (cachedBufferReceivedListener == null) {
     cachedBufferReceivedListener = bufferReceivedListener;
   }
   cachedBufferReceivedListener.notify(...)
   ```
   
   I'm not sure whether it's better to place `cachedBufferReceivedListener` in 
the input channel or input gate.
   
   (please unresolve this thread)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to