Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r135481583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java --- @@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() { public void setBufferPool(BufferPool bufferPool) { // Sanity checks - checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), + if (!getConsumedPartitionType().isCreditBased()) { + checkArgument(numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), "Bug in input gate setup logic: buffer pool has not enough guaranteed buffers " + - "for this input gate. Input gates require at least as many buffers as " + + "for this input gate. Input gates require at least as many buffers as " + "there are input channels."); + } checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool has" + - "already been set for this input gate."); + "already been set for this input gate."); this.bufferPool = checkNotNull(bufferPool); } + /** + * Assign the exclusive buffers to all remote input channels directly for credit-based mode. + * + * @param networkBufferPool The global pool to request and recycle exclusive buffers + * @param networkBuffersPerChannel The number of exclusive buffers for each channel + */ + public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException { + this.networkBufferPool = checkNotNull(networkBufferPool); --- End diff -- please guard against using this method multiple times (like in `setBufferPool`) as a sanity check
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---