Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4485#discussion_r138879343
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ---
    @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) {
     
                        this.numTotalRequiredBuffers += numRequiredBuffers;
     
    -                   final List<MemorySegment> segments = new 
ArrayList<>(numRequiredBuffers);
    -                   for (int i = 0 ; i < numRequiredBuffers ; i++) {
    -                           segments.add(availableMemorySegments.poll());
    -                   }
    +                   redistributeBuffers();
    +           }
     
    +           final List<MemorySegment> segments = new 
ArrayList<>(numRequiredBuffers);
    +           for (int i = 0 ; i < numRequiredBuffers ; i++) {
                        try {
    -                           redistributeBuffers();
    -                   } catch (IOException e) {
    -                           if (segments.size() > 0) {
    -                                   recycleMemorySegments(segments);
    -                           }
    -
    +                           segments.add(availableMemorySegments.take());
    --- End diff --
    
    I know, I was the one who suggested it, but thinking about the blocking 
`take()` a bit more and with some more background I acquired over the last 
weeks, I'm getting the feeling, we should do the request similar to 
`LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting 
forever, we may at least be stopped by the `destroy()` function being called. 
Or what do you think? I'm thinking about something like this:
    ```
                final ArrayList<MemorySegment> segments = new 
ArrayList<>(numRequiredBuffers);
                try {
                        while (segments.size() < numRequiredBuffers) {
                                if (isDestroyed) {
                                        throw new IllegalStateException("Buffer 
pool is destroyed.");
                                }
    
                                final MemorySegment segment = 
availableMemorySegments.poll(2, TimeUnit.SECONDS);
                                if (segment != null) {
                                        segments.add(segment);
                                }
                        }
                } catch (Throwable e) {
                        recycleMemorySegments(segments);
                        ExceptionUtils.rethrowIOException(e);
                }
    ```
    (using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`)
    
    The following test (for `NetworkBufferPoolTest`) could verify this 
behaviour:
    ```
        @Rule
        public ExpectedException expectedException = ExpectedException.none();
    
        /**
         * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, 
verifying it may be aborted in
         * case of a concurrent {@link NetworkBufferPool#destroy()} call.
         */
        @Test
        public void testRequestMemorySegmentsInterruptable() throws Exception {
                final int numBuffers = 10;
    
                NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
                MemorySegment segment = globalPool.requestMemorySegment();
                assertNotNull(segment);
    
                final OneShotLatch isRunning = new OneShotLatch();
                CheckedThread asyncRequest = new CheckedThread() {
                        @Override
                        public void go() throws Exception {
                                isRunning.trigger();
                                globalPool.requestMemorySegments(10);
                        }
                };
                asyncRequest.start();
    
                // We want the destroy call inside the blocking part of the 
globalPool.requestMemorySegments()
                // call above. We cannot guarantee this though but make it 
highly probable:
                isRunning.await();
                Thread.sleep(10);
                globalPool.destroy();
    
                segment.free();
    
                expectedException.expect(IllegalStateException.class);
                expectedException.expectMessage("destroyed");
                asyncRequest.sync();
        }
    ```


---

Reply via email to