Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r135481583
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ---
    @@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() {
     
        public void setBufferPool(BufferPool bufferPool) {
                // Sanity checks
    -           checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
    +           if (!getConsumedPartitionType().isCreditBased()) {
    +                   checkArgument(numberOfInputChannels == 
bufferPool.getNumberOfRequiredMemorySegments(),
                                "Bug in input gate setup logic: buffer pool has 
not enough guaranteed buffers " +
    -                                           "for this input gate. Input 
gates require at least as many buffers as " +
    +                                   "for this input gate. Input gates 
require at least as many buffers as " +
                                                "there are input channels.");
    +           }
     
                checkState(this.bufferPool == null, "Bug in input gate setup 
logic: buffer pool has" +
    -                           "already been set for this input gate.");
    +                   "already been set for this input gate.");
     
                this.bufferPool = checkNotNull(bufferPool);
        }
     
    +   /**
    +    * Assign the exclusive buffers to all remote input channels directly 
for credit-based mode.
    +    *
    +    * @param networkBufferPool The global pool to request and recycle 
exclusive buffers
    +    * @param networkBuffersPerChannel The number of exclusive buffers for 
each channel
    +    */
    +   public void assignExclusiveSegments(NetworkBufferPool 
networkBufferPool, int networkBuffersPerChannel) throws IOException {
    +           this.networkBufferPool = checkNotNull(networkBufferPool);
    --- End diff --
    
    please guard against using this method multiple times (like in 
`setBufferPool`) as a sanity check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to