Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138879343 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java --- @@ -159,23 +159,20 @@ public void recycle(MemorySegment segment) { this.numTotalRequiredBuffers += numRequiredBuffers; - final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); - for (int i = 0 ; i < numRequiredBuffers ; i++) { - segments.add(availableMemorySegments.poll()); - } + redistributeBuffers(); + } + final List<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); + for (int i = 0 ; i < numRequiredBuffers ; i++) { try { - redistributeBuffers(); - } catch (IOException e) { - if (segments.size() > 0) { - recycleMemorySegments(segments); - } - + segments.add(availableMemorySegments.take()); --- End diff -- I know, I was the one who suggested it, but thinking about the blocking `take()` a bit more and with some more background I acquired over the last weeks, I'm getting the feeling, we should do the request similar to `LocalBufferPool#requestBuffer()` so that if (for some reason) we are waiting forever, we may at least be stopped by the `destroy()` function being called. Or what do you think? I'm thinking about something like this: ``` final ArrayList<MemorySegment> segments = new ArrayList<>(numRequiredBuffers); try { while (segments.size() < numRequiredBuffers) { if (isDestroyed) { throw new IllegalStateException("Buffer pool is destroyed."); } final MemorySegment segment = availableMemorySegments.poll(2, TimeUnit.SECONDS); if (segment != null) { segments.add(segment); } } } catch (Throwable e) { recycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } ``` (using the same timeout of 2s as in `LocalBufferPool#requestBuffer()`) The following test (for `NetworkBufferPoolTest`) could verify this behaviour: ``` @Rule public ExpectedException expectedException = ExpectedException.none(); /** * Tests {@link NetworkBufferPool#requestMemorySegments(int)}, verifying it may be aborted in * case of a concurrent {@link NetworkBufferPool#destroy()} call. */ @Test public void testRequestMemorySegmentsInterruptable() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); MemorySegment segment = globalPool.requestMemorySegment(); assertNotNull(segment); final OneShotLatch isRunning = new OneShotLatch(); CheckedThread asyncRequest = new CheckedThread() { @Override public void go() throws Exception { isRunning.trigger(); globalPool.requestMemorySegments(10); } }; asyncRequest.start(); // We want the destroy call inside the blocking part of the globalPool.requestMemorySegments() // call above. We cannot guarantee this though but make it highly probable: isRunning.await(); Thread.sleep(10); globalPool.destroy(); segment.free(); expectedException.expect(IllegalStateException.class); expectedException.expectMessage("destroyed"); asyncRequest.sync(); } ```
---