Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157548033
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
    @@ -52,6 +54,10 @@
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
     
    +   /** The number of non-event buffers currently in this subpartition */
    +   @GuardedBy("buffers")
    +   private volatile int buffersInBacklog;
    --- End diff --
    
    I shortly thought about relying on `buffers.size()` here to reduce 
complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may 
show some race conditions then without synchronisation. However, if we picked 
up the idea again of returning the backlog size with the buffer itself (which 
is retrieved under the lock), i.e. similar to `BufferAndAvailability` being 
returned by the `SequenceNumberingViewReader`, this would work and we would not 
need the `volatile` here. Since you split the implementations into 
`PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a 
viable approach again.
    What do you think? What would you prefer?


---

Reply via email to