azagrebin commented on a change in pull request #8310: [FLINK-12331][network] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8310#discussion_r279426797
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -290,27 +304,38 @@ public String getOwningTaskName() { // Setup/Life-cycle // ------------------------------------------------------------------------ - public void setBufferPool(BufferPool bufferPool) { + @Override + public void setup() throws IOException { checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool has" + "already been set for this input gate."); - this.bufferPool = checkNotNull(bufferPool); + int maxNumberOfMemorySegments; + try { + if (isCreditBased) { + // assign exclusive buffers to input channels directly and use the rest for floating buffers + assignExclusiveSegments(); + + maxNumberOfMemorySegments = consumedPartitionType.isBounded() ? floatingNetworkBuffersPerGate : Integer.MAX_VALUE; + bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments); + } else { + maxNumberOfMemorySegments = consumedPartitionType.isBounded() ? + numberOfInputChannels * networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE; + bufferPool = networkBufferPool.createBufferPool(numberOfInputChannels, maxNumberOfMemorySegments); + } + } catch (Throwable t) { + if (bufferPool != null) { + bufferPool.lazyDestroy(); + } + + ExceptionUtils.rethrowIOException(t); + } } /** * Assign the exclusive buffers to all remote input channels directly for credit-based mode. - * - * @param networkBufferPool The global pool to request and recycle exclusive buffers - * @param networkBuffersPerChannel The number of exclusive buffers for each channel */ - public void assignExclusiveSegments(NetworkBufferPool networkBufferPool, int networkBuffersPerChannel) throws IOException { - checkState(this.isCreditBased, "Bug in input gate setup logic: exclusive buffers only exist with credit-based flow control."); - checkState(this.networkBufferPool == null, "Bug in input gate setup logic: global buffer pool has" + Review comment: Maybe, we could keep this check `networkBufferPool == null` here where it is really relevant and allow nulls in the constructor? I would also keep the existing methods used in tests as they are now to avoid so many changes in tests at the moment because it looks like another refactoring which could be done at least in another commit. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services