azagrebin commented on a change in pull request #8310: [FLINK-12331][network] 
Introduce partition/gate setup to decouple task registration with 
NetworkEnvironment
URL: https://github.com/apache/flink/pull/8310#discussion_r279426797
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##########
 @@ -290,27 +304,38 @@ public String getOwningTaskName() {
        // Setup/Life-cycle
        // 
------------------------------------------------------------------------
 
-       public void setBufferPool(BufferPool bufferPool) {
+       @Override
+       public void setup() throws IOException {
                checkState(this.bufferPool == null, "Bug in input gate setup 
logic: buffer pool has" +
                        "already been set for this input gate.");
 
-               this.bufferPool = checkNotNull(bufferPool);
+               int maxNumberOfMemorySegments;
+               try {
+                       if (isCreditBased) {
+                               // assign exclusive buffers to input channels 
directly and use the rest for floating buffers
+                               assignExclusiveSegments();
+
+                               maxNumberOfMemorySegments = 
consumedPartitionType.isBounded() ? floatingNetworkBuffersPerGate : 
Integer.MAX_VALUE;
+                               bufferPool = 
networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
+                       } else {
+                               maxNumberOfMemorySegments = 
consumedPartitionType.isBounded() ?
+                                       numberOfInputChannels * 
networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+                               bufferPool = 
networkBufferPool.createBufferPool(numberOfInputChannels, 
maxNumberOfMemorySegments);
+                       }
+               } catch (Throwable t) {
+                       if (bufferPool != null) {
+                               bufferPool.lazyDestroy();
+                       }
+
+                       ExceptionUtils.rethrowIOException(t);
+               }
        }
 
        /**
         * Assign the exclusive buffers to all remote input channels directly 
for credit-based mode.
-        *
-        * @param networkBufferPool The global pool to request and recycle 
exclusive buffers
-        * @param networkBuffersPerChannel The number of exclusive buffers for 
each channel
         */
-       public void assignExclusiveSegments(NetworkBufferPool 
networkBufferPool, int networkBuffersPerChannel) throws IOException {
-               checkState(this.isCreditBased, "Bug in input gate setup logic: 
exclusive buffers only exist with credit-based flow control.");
-               checkState(this.networkBufferPool == null, "Bug in input gate 
setup logic: global buffer pool has" +
 
 Review comment:
   Maybe, we could keep this check `networkBufferPool == null` here where it is 
really relevant and allow nulls in the constructor? I would also keep the 
existing methods used in tests as they are now to avoid so many changes in 
tests at the moment because it looks like another refactoring which could be 
done at least in another commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to