[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4552 ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162323707 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; --- End diff -- no, you don't need to - I can create a separate PR for FLINK-8225 ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162266243 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; --- End diff -- Should I pick the FLINK-8225 in this PR? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162259781 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java --- @@ -164,11 +165,13 @@ private boolean dispose() { private void handInChannel(Channel channel) { synchronized (connectLock) { try { - PartitionRequestClientHandler requestHandler = channel.pipeline() - .get(PartitionRequestClientHandler.class); + NetworkClientHandler clientHandler = channel.pipeline().get(PartitionRequestClientHandler.class); + if (clientHandler == null) { + clientHandler = channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class); + } --- End diff -- good idea! ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162259728 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java --- @@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request); try { - SequenceNumberingViewReader reader = new SequenceNumberingViewReader( - request.receiverId, - request.credit, - outboundQueue); + NetworkSequenceViewReader reader; + if (request.credit > 0) { + reader = new CreditBasedSequenceNumberingViewReader( + request.receiverId, + request.credit, + outboundQueue); + } else { + reader = new SequenceNumberingViewReader( + request.receiverId, + outboundQueue); + } --- End diff -- Yes, I actually took a hacky way to realize it for easy. :) I will consider whether it is feasible to get the config here. If so I will take the config way, otherwise I may add a comment clarifying this. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162259766 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -125,11 +126,11 @@ private void enqueueAvailableReader(final SequenceNumberingViewReader reader) th * @return readers which are enqueued available for transferring data */ @VisibleForTesting - ArrayDeque getAvailableReaders() { + ArrayDeque getAvailableReaders() { return availableReaders; } - void notifyReaderCreated(final SequenceNumberingViewReader reader) { + public void notifyReaderCreated(final NetworkSequenceViewReader reader) { --- End diff -- yes, for my careless ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162107348 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; --- End diff -- also found that relic of FLINK-8225 (removed in my patch for the previous commit, and it should be inside FLINK-7456, not the hotfix which we will revert later) ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162103769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java --- @@ -164,11 +165,13 @@ private boolean dispose() { private void handInChannel(Channel channel) { synchronized (connectLock) { try { - PartitionRequestClientHandler requestHandler = channel.pipeline() - .get(PartitionRequestClientHandler.class); + NetworkClientHandler clientHandler = channel.pipeline().get(PartitionRequestClientHandler.class); + if (clientHandler == null) { + clientHandler = channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class); + } --- End diff -- if you let `NetworkClientHandler` extend from `ChannelHandler`, then this can be simplified to ``` NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class); ``` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162104968 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -125,11 +126,11 @@ private void enqueueAvailableReader(final SequenceNumberingViewReader reader) th * @return readers which are enqueued available for transferring data */ @VisibleForTesting - ArrayDeque getAvailableReaders() { + ArrayDeque getAvailableReaders() { return availableReaders; } - void notifyReaderCreated(final SequenceNumberingViewReader reader) { + public void notifyReaderCreated(final NetworkSequenceViewReader reader) { --- End diff -- this could stay `package-private` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162106371 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java --- @@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request); try { - SequenceNumberingViewReader reader = new SequenceNumberingViewReader( - request.receiverId, - request.credit, - outboundQueue); + NetworkSequenceViewReader reader; + if (request.credit > 0) { + reader = new CreditBasedSequenceNumberingViewReader( + request.receiverId, + request.credit, + outboundQueue); + } else { + reader = new SequenceNumberingViewReader( + request.receiverId, + outboundQueue); + } --- End diff -- This seems a bit hacky since it does not rely on the configuration parameter directly but rather on the affect it has, i.e. that a `PartitionRequest` in credit-based flow control always has a non-zero credit (due to `NetworkBufferPool#requestMemorySegments()`). Relying on the configuration parameter would be nicer, but alternatively, you could add a comment clarifying this. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162047076 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- Do you, by any chance, know why 800 worked and is not enough here anymore? I mean, why/how does this test need 800 buffers? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161667346 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- yes, the same reason as above ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161667234 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java --- @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16); --- End diff -- yes, i will set 9 for it. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161568012 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result + assertFalse(view.nextBufferIsEvent()); --- End diff -- we should test this everywhere we access `getNextBuffer()` or add buffers via `add()` - also if `getNextBuffer()` is `null` or before even requesting anything ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161570121 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result + assertFalse(view.nextBufferIsEvent()); --- End diff -- also test `read.buffer().isBuffer()` then? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161565565 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); --- End diff -- actually, let's use `assertThat(queue.getAvailableReaders(), contains(reader));` here which gives much nicer output in case something is wrong ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161576253 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java --- @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16); --- End diff -- just a note for the curious: this test can cope with higher number of network buffers and is waiting for all of them to be blocked - increasing this to `9` would have been enough here though (we require 2 exclusive buffers now per default, while 1 was the minimum per incoming channel) ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567305 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); --- End diff -- let's end with `assertNull(channel.readOutbound());` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559926 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); --- End diff -- let's remove that mock ... ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161546145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { assertEquals(2, inputChannel.getUnannouncedCredit()); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel); - // Release the input channel inputGate.releaseAllResources(); channel.runPendingTasks(); - // It will not notify credits for released input channel + // It should send partition request first, and send close request after releasing input channel, + // but will not notify credits for released input channel. + Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); --- End diff -- similar here: verify `PartitionRequest` after `inputChannel.requestSubpartition` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161578518 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- Is that also the reason here? I see that otherwise we get into `Insufficient number of network buffers` but it does not look as if it was configured as tightly... (just want to rule out some memory leak with the new code) ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161573976 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(4L)); + assertFalse(reader.nextBufferIsEvent()); --- End diff -- also test `read.buffer().isBuffer()`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161569135 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + assertFalse(view.nextBufferIsEvent()); + read = view.getNextBuffer(); + assertNotNull(read); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); + assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); + // Add event to the queue... Buffer event = createBuffer(); event.tagAsEvent(); subpartition.add(event); + assertTrue(view.nextBufferIsEvent()); assertEquals(3, subpartition.getTotalNumberOfBuffers()); - assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(3)).notifyBuffersAvailable(eq(1L)); } --- End diff -- maybe verify that `nextBufferIsEvent()` returns the right thing after adding a real buffer now (with the event being next) ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567331 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available buffers to trigger enqueue the reader + final int notifyNumBuffers = 5; + for (int i = 0; i < notifyNumBuffers; i++) { + reader.notifyBuffersAvailable(1); + } + +
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161547041 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even --- End diff -- nit: `is an event` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559690 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); --- End diff -- let's remove that mock ... ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559642 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); --- End diff -- let's remove that mock ... ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161546199 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { assertEquals(2, inputChannel.getUnannouncedCredit()); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel); - // Release the input channel inputGate.releaseAllResources(); channel.runPendingTasks(); - // It will not notify credits for released input channel + // It should send partition request first, and send close request after releasing input channel, + // but will not notify credits for released input channel. + Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); + readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(CloseRequest.class)); --- End diff -- put these after `inputGate.releaseAllResources()`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161485403 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -434,6 +443,29 @@ private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) t UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); } + /** +* Creates and returns a remote input channel for the specific input gate with specific partition request client. +* +* @param inputGate The input gate owns the created input channel. +* @param client The client is used to send partition request. +* @return The new created remote input channel. +*/ + private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, PartitionRequestClient client) throws Exception { --- End diff -- could you modify `PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` to rely on this method, i.e. `return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class));`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559913 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); --- End diff -- let's remove that mock ... ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161565896 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available buffers to trigger enqueue the reader + final int notifyNumBuffers = 5; + for (int i = 0; i < notifyNumBuffers; i++) { + reader.notifyBuffersAvailable(1); + } + +
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161545040 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws Exception { handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse1); handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse2); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel1); - handler.notifyCreditAvailable(inputChannel2); - assertEquals(2, inputChannel1.getUnannouncedCredit()); assertEquals(2, inputChannel2.getUnannouncedCredit()); channel.runPendingTasks(); - // The two input channels should notify credits via writable channel + // The two input channels should send partition requests and then notify credits via writable channel assertTrue(channel.isWritable()); Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(inputChannel1.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); + + readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(inputChannel2.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); --- End diff -- Let's verify those two `PartitionRequest` messages above since `inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if we change the `initialCredit` to be included in the `unannouncedCredit`). ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161450876 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -199,6 +199,19 @@ public boolean isReleased() { } } + @Override + public boolean nextBufferIsEvent() { + if (nextBuffer != null) { + return !nextBuffer.isBuffer(); + } --- End diff -- agree with integration ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161248402 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -52,4 +52,9 @@ boolean isReleased(); Throwable getFailureCause(); + + /** +* Returns whether the next buffer is event or not. --- End diff -- `is an event` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161247589 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,6 +92,46 @@ void requestSubpartitionView( } } + /** +* The credits from consumer are added in incremental way. +* +* @param creditDeltas The credit deltas +*/ + public void addCredit(int creditDeltas) { + numCreditsAvailable += creditDeltas; + } + + /** +* Updates the value to indicate whether the reader is enqueued in the pipeline or not. +* +* @param isRegisteredAvailable True if this reader is already enqueued in the pipeline. +*/ + public void notifyAvailabilityChanged(boolean isRegisteredAvailable) { + this.isRegisteredAvailable = isRegisteredAvailable; + } + + public boolean isRegisteredAvailable() { + return isRegisteredAvailable; + } + + /** +* Check whether this reader is available or not. +* +* Return true only if the next buffer is event or the reader has both available +* credits and buffers. +*/ + public boolean isAvailable() { + if (numBuffersAvailable.get() <= 0) { + return false; + } + + if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) { --- End diff -- how about checking `numCreditsAvailable` first since that's the cheaper check? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161241148 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -49,10 +50,24 @@ private volatile ResultSubpartitionView subpartitionView; + /** +* The status indicating whether this reader is already enqueued in the pipeline for transferring +* data or not. It is mainly used for avoid registering this reader to the pipeline repeatedly. +*/ + private boolean isRegisteredAvailable; + + /** The number of available buffers for holding data on the consumer side. */ + private int numCreditsAvailable; --- End diff -- Just a note since I was wondering whether we need synchronization here (not needed after verifying the things below): 1) `numCreditsAvailable` is increased via `PartitionRequestServerHandler#channelRead0` which is a separate channel handler than `PartitionRequestQueue` (see `NettyProtocol#getServerChannelHandlers`). According to [Netty's thread model](https://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h2-34), we should be safe though: > A user can specify an EventExecutor when he or she adds a handler to a ChannelPipeline. > - If specified, the handler methods of the ChannelHandler are always invoked by the specified EventExecutor. > - If unspecified, the handler methods are always invoked by the EventLoop that its associated Channel is registered to. 2) `numCreditsAvailable` is read from `PartitionRequestQueue#enqueueAvailableReader()` and `SequenceNumberingViewReader#getNextBuffer()` which are both accessed by the channel's IO thread only. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161296711 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -73,6 +73,9 @@ /** Flag indicating whether all resources have been released. */ private AtomicBoolean isReleased = new AtomicBoolean(); + /** The next buffer to hand out. */ + private Buffer nextBuffer; --- End diff -- We need to protect this against race conditions with respect to `releaseAllResources()` as well. Actually, I'm surprised that nothing in this class is protected against concurrently releasing it. Although I have created a separate issue for this ([FLINK-8425](https://issues.apache.org/jira/browse/FLINK-8425)), I think we may need to solve this here for the new `nextBuffer` anyway. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161282684 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,6 +92,46 @@ void requestSubpartitionView( } } + /** +* The credits from consumer are added in incremental way. +* +* @param creditDeltas The credit deltas +*/ + public void addCredit(int creditDeltas) { + numCreditsAvailable += creditDeltas; + } + + /** +* Updates the value to indicate whether the reader is enqueued in the pipeline or not. +* +* @param isRegisteredAvailable True if this reader is already enqueued in the pipeline. +*/ + public void notifyAvailabilityChanged(boolean isRegisteredAvailable) { + this.isRegisteredAvailable = isRegisteredAvailable; + } + + public boolean isRegisteredAvailable() { + return isRegisteredAvailable; + } + + /** +* Check whether this reader is available or not. +* +* Return true only if the next buffer is event or the reader has both available +* credits and buffers. +*/ + public boolean isAvailable() { + if (numBuffersAvailable.get() <= 0) { + return false; + } + + if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) { --- End diff -- also, why not simply do the following for this whole method? ``` return numBuffersAvailable.get() > 0 && (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); ``` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161249082 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -199,6 +199,19 @@ public boolean isReleased() { } } + @Override + public boolean nextBufferIsEvent() { + if (nextBuffer != null) { + return !nextBuffer.isBuffer(); + } --- End diff -- We probably need synchronization here to access `nextBuffer` and also check for `isReleased()` similar to `getNextBuffer`, since we are basically doing the same but without taking the buffer. Why not integrate this into `getNextBuffer` and the `BufferAndBacklog` returned there? Inside that method, gathering this additional info is basically for free and we may thus speed up some code paths. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161295428 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -163,6 +190,15 @@ public boolean isReleased() { return parent.isReleased() || isReleased.get(); } + @Override + public boolean nextBufferIsEvent() { + if (nextBuffer != null) { + return !nextBuffer.isBuffer(); + } + + return false; --- End diff -- I'm afraid relying on `nextBuffer` won't be enough because it might not have been there during `getNextBuffer()` but may be there now. Please add a test for this case. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161242036 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -49,10 +50,24 @@ private volatile ResultSubpartitionView subpartitionView; + /** +* The status indicating whether this reader is already enqueued in the pipeline for transferring +* data or not. It is mainly used for avoid registering this reader to the pipeline repeatedly. +*/ + private boolean isRegisteredAvailable; --- End diff -- I know it's default-initialized to false but let's make this explicit ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161295951 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -199,6 +199,19 @@ public boolean isReleased() { } } + @Override + public boolean nextBufferIsEvent() { + if (nextBuffer != null) { + return !nextBuffer.isBuffer(); + } + + if (spilledView != null) { --- End diff -- `checkState(spilledView != null, "No in-memory buffers available, but also nothing spilled.");` just like in `getNextBuffer()`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161287607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,6 +92,46 @@ void requestSubpartitionView( } } + /** +* The credits from consumer are added in incremental way. +* +* @param creditDeltas The credit deltas +*/ + public void addCredit(int creditDeltas) { + numCreditsAvailable += creditDeltas; + } + + /** +* Updates the value to indicate whether the reader is enqueued in the pipeline or not. +* +* @param isRegisteredAvailable True if this reader is already enqueued in the pipeline. +*/ + public void notifyAvailabilityChanged(boolean isRegisteredAvailable) { + this.isRegisteredAvailable = isRegisteredAvailable; + } + + public boolean isRegisteredAvailable() { + return isRegisteredAvailable; + } + + /** +* Check whether this reader is available or not. +* +* Return true only if the next buffer is event or the reader has both available +* credits and buffers. +*/ + public boolean isAvailable() { + if (numBuffersAvailable.get() <= 0) { + return false; + } + + if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) { --- End diff -- one more thing: in the special case of using it inside `getNextBuffer()`, we already retrieved the `remaining` number of buffers so we can spare one lookup into an atomic integer here by passing this one in ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160849525 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -250,10 +304,12 @@ private void handleException(Channel channel, Throwable cause) throws IOExceptio private void releaseAllResources() throws IOException { SequenceNumberingViewReader reader; - while ((reader = nonEmptyReader.poll()) != null) { + while ((reader = availableReaders.poll()) != null) { --- End diff -- yes, it should release all readers here. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160849478 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +94,37 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification from the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + @VisibleForTesting + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + @VisibleForTesting + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- agree ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160695373 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -98,20 +135,35 @@ public void close() { } } + /** +* Adds to maintain the unannounced credits from the consumer and it may trigger +* enqueue the corresponding reader for this consumer transferring data. +* +* @param receiverId The input channel id to identify consumer. +* @param credit The unannounced credits of the consumer. +*/ + public void addCredit(InputChannelID receiverId, int credit) throws Exception { --- End diff -- could be `package-private` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160694722 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +94,37 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification from the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + @VisibleForTesting + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + @VisibleForTesting + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- actually, we can inline this into the first one, make this `private`, and reduce `Mockito` use in the tests if we just made the `availableReaders` field accessible to the tests, e.g. ``` /** * Accesses internal state to verify reader registration in the unit tests. * * Do not use anywhere else! * * @return readers which are enqueued available for transferring data */ @VisibleForTesting ArrayDeque getAvailableReaders() { return availableReaders; } ``` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160714424 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -250,10 +304,12 @@ private void handleException(Channel channel, Throwable cause) throws IOExceptio private void releaseAllResources() throws IOException { SequenceNumberingViewReader reader; - while ((reader = nonEmptyReader.poll()) != null) { + while ((reader = availableReaders.poll()) != null) { --- End diff -- Previously, we did not have access to all views but only those with data, but shouldn't we release all views instead? Especially now since there are views with data but with not enough credit. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160371761 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- Thanks for clarification. I misunderstood this tag from the comments, I will modify it after all the reviews together. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160361957 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- I interpret this tag actually not only for methods that are *only* used by testing, but for those where we had to increase visibility, e.g. from `private` to `package-private` as here, to be usable in tests. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160333139 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- The same with `triggerEnqueueAvailableReader` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160333068 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- This method is not used only for testing. It may be called via `addCredit` method. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r159399376 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving --- End diff -- `from` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r159400956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- `@VisibleForTesting`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r159400371 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,6 +83,37 @@ void requestSubpartitionView( } } + public void addCredit(int credit) { + numCreditsAvailable += credit; + } + + public void notifyAvailableChanged(boolean update) { --- End diff -- `notifyAvailabilityChanged` or actually rather `setAvailability`? also, please add a comment on what this is about / what "availability" means here ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r159401061 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- `@VisibleForTesting`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r157760560 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -158,32 +164,44 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { - if (!bufferListener.hasStagedBufferOrEvent() && stagedMessages.isEmpty()) { - decodeMsg(msg, false); - } - else { - stagedMessages.add(msg); - } - } - catch (Throwable t) { + decodeMsg(msg); + } catch (Throwable t) { notifyAllChannelsOfErrorAndClose(t); } } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof RemoteInputChannel) { + boolean triggerWrite = inputChannelsWithCredit.isEmpty(); --- End diff -- please re-add the comment here, too ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r157760652 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -318,192 +307,56 @@ else if (bufferProvider.isDestroyed()) { MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray); Buffer buffer = new Buffer(memSeg, FreeingBufferRecycler.INSTANCE, false); - inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1); - - return true; - } - } - finally { - if (releaseNettyBuffer) { - bufferOrEvent.releaseBuffer(); + inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); } + } finally { + bufferOrEvent.releaseBuffer(); } } - /** -* This class would be replaced by CreditBasedClientHandler in the final, -* so we only implement this method in CreditBasedClientHandler. -*/ - void notifyCreditAvailable(RemoteInputChannel inputChannel) { - } - - private class AsyncErrorNotificationTask implements Runnable { - - private final Throwable error; - - public AsyncErrorNotificationTask(Throwable error) { - this.error = error; - } - - @Override - public void run() { - notifyAllChannelsOfErrorAndClose(error); - } - } - - /** -* A buffer availability listener, which subscribes/unsubscribes the NIO -* read event. -* -* If no buffer is available, the channel read event will be unsubscribed -* until one becomes available again. -* -* After a buffer becomes available again, the buffer is handed over by -* the thread calling {@link #notifyBufferAvailable(Buffer)} to the network I/O -* thread, which then continues the processing of the staged buffer. -*/ - private class BufferListenerTask implements BufferListener, Runnable { - - private final AtomicReference availableBuffer = new AtomicReference(); - - private NettyMessage.BufferResponse stagedBufferResponse; - - private boolean waitForBuffer(BufferProvider bufferProvider, NettyMessage.BufferResponse bufferResponse) { - - stagedBufferResponse = bufferResponse; - - if (bufferProvider.addBufferListener(this)) { - if (ctx.channel().config().isAutoRead()) { - ctx.channel().config().setAutoRead(false); - } - - return true; - } - else { - stagedBufferResponse = null; - - return false; - } - } - - private boolean hasStagedBufferOrEvent() { - return stagedBufferResponse != null; - } - - public void notifyBufferDestroyed() { - // The buffer pool has been destroyed - stagedBufferResponse = null; - - if (stagedMessages.isEmpty()) { - ctx.channel().config().setAutoRead(true); - ctx.channel().read(); - } - else { - ctx.channel().eventLoop().execute(stagedMessagesHandler); - } + private void writeAndFlushNextMessageIfPossible(Channel channel) { --- End diff -- please re-add the comment here, too ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r157760482 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -94,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) { } } + void notifyCreditAvailable(final RemoteInputChannel inputChannel) { --- End diff -- please re-add the comment here, too ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r157760366 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -41,30 +42,33 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.ArrayDeque; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Channel handler to read the messages of buffer response or error response from the + * producer, to write and flush the unannounced credits for the producer. + */ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class); - private final ConcurrentMap inputChannels = new ConcurrentHashMap(); - - private final AtomicReference channelError = new AtomicReference(); + /** Channels, which already requested partitions from the producers. */ + private final ConcurrentMap inputChannels = new ConcurrentHashMap<>(); - private final BufferListenerTask bufferListener = new BufferListenerTask(); + /** Channels, which will notify the producers about unannounced credit. */ + private final ArrayDeque inputChannelsWithCredit = new ArrayDeque<>(); - private final Queue stagedMessages = new ArrayDeque(); + private final AtomicReference channelError = new AtomicReference<>(); - private final StagedMessagesHandlerTask stagedMessagesHandler = new StagedMessagesHandlerTask(); + private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener(); /** * Set of cancelled partition requests. A request is cancelled iff an input channel is cleared * while data is still coming in for this channel. */ - private final ConcurrentMap cancelled = Maps.newConcurrentMap(); + private final ConcurrentMap cancelled = new ConcurrentHashMap<>(); private volatile ChannelHandlerContext ctx; --- End diff -- it looks like you missed to migrate some of the comments that were present in `CreditBasedClientHandler` but are not present here, e.g. for `ctx` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r153405989 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - if (gate.getConsumedPartitionType().isCreditBased()) { - // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly - bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - } else { - int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - gate.getNumberOfInputChannels() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), - maxNumberOfMemorySegments); - } + // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly + bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); + gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- That makes sense, I will keep the previous behaviour for batch processing. ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r152985049 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - if (gate.getConsumedPartitionType().isCreditBased()) { - // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly - bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - } else { - int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - gate.getNumberOfInputChannels() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), - maxNumberOfMemorySegments); - } + // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly + bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); + gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- What about the non-bounded partition type that we use for batch processing? Shouldn't we use an unbounded number of floating buffers there, as previously? ---