[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366948#comment-16366948 ] ASF GitHub Bot commented on FLINK-7456: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4552 > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332062#comment-16332062 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4552 looks good - Let's start some cluster tests and then we're ready to merge > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330427#comment-16330427 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162323707 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; --- End diff -- no, you don't need to - I can create a separate PR for FLINK-8225 > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330186#comment-16330186 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162266243 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; --- End diff -- Should I pick the FLINK-8225 in this PR? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330155#comment-16330155 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 1. Thanks for you FLINK08425. 2. I would have thought the tests for `ResultSubpartition#nextBufferIsEvent` which have already been covered before. The test for `BufferAndBacklog#nextBufferIsEvent()` is not included before, and thanks for providing the patch for it. 3. I will create a separate JIRA for the switch and also include the commit in this PR. And check the travis fail. 4. I will add the comment for the document you mentioned later. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330126#comment-16330126 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162259781 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java --- @@ -164,11 +165,13 @@ private boolean dispose() { private void handInChannel(Channel channel) { synchronized (connectLock) { try { - PartitionRequestClientHandler requestHandler = channel.pipeline() - .get(PartitionRequestClientHandler.class); + NetworkClientHandler clientHandler = channel.pipeline().get(PartitionRequestClientHandler.class); + if (clientHandler == null) { + clientHandler = channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class); + } --- End diff -- good idea! > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330123#comment-16330123 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162259728 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java --- @@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request); try { - SequenceNumberingViewReader reader = new SequenceNumberingViewReader( - request.receiverId, - request.credit, - outboundQueue); + NetworkSequenceViewReader reader; + if (request.credit > 0) { + reader = new CreditBasedSequenceNumberingViewReader( + request.receiverId, + request.credit, + outboundQueue); + } else { + reader = new SequenceNumberingViewReader( + request.receiverId, + outboundQueue); + } --- End diff -- Yes, I actually took a hacky way to realize it for easy. :) I will consider whether it is feasible to get the config here. If so I will take the config way, otherwise I may add a comment clarifying this. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330125#comment-16330125 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162259766 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -125,11 +126,11 @@ private void enqueueAvailableReader(final SequenceNumberingViewReader reader) th * @return readers which are enqueued available for transferring data */ @VisibleForTesting - ArrayDeque getAvailableReaders() { + ArrayDeque getAvailableReaders() { return availableReaders; } - void notifyReaderCreated(final SequenceNumberingViewReader reader) { + public void notifyReaderCreated(final NetworkSequenceViewReader reader) { --- End diff -- yes, for my careless > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328972#comment-16328972 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162104968 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -125,11 +126,11 @@ private void enqueueAvailableReader(final SequenceNumberingViewReader reader) th * @return readers which are enqueued available for transferring data */ @VisibleForTesting - ArrayDeque getAvailableReaders() { + ArrayDeque getAvailableReaders() { return availableReaders; } - void notifyReaderCreated(final SequenceNumberingViewReader reader) { + public void notifyReaderCreated(final NetworkSequenceViewReader reader) { --- End diff -- this could stay `package-private` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328971#comment-16328971 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162106371 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java --- @@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request); try { - SequenceNumberingViewReader reader = new SequenceNumberingViewReader( - request.receiverId, - request.credit, - outboundQueue); + NetworkSequenceViewReader reader; + if (request.credit > 0) { + reader = new CreditBasedSequenceNumberingViewReader( + request.receiverId, + request.credit, + outboundQueue); + } else { + reader = new SequenceNumberingViewReader( + request.receiverId, + outboundQueue); + } --- End diff -- This seems a bit hacky since it does not rely on the configuration parameter directly but rather on the affect it has, i.e. that a `PartitionRequest` in credit-based flow control always has a non-zero credit (due to `NetworkBufferPool#requestMemorySegments()`). Relying on the configuration parameter would be nicer, but alternatively, you could add a comment clarifying this. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328973#comment-16328973 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162107348 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; --- End diff -- also found that relic of FLINK-8225 (removed in my patch for the previous commit, and it should be inside FLINK-7456, not the hotfix which we will revert later) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328974#comment-16328974 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162103769 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java --- @@ -164,11 +165,13 @@ private boolean dispose() { private void handInChannel(Channel channel) { synchronized (connectLock) { try { - PartitionRequestClientHandler requestHandler = channel.pipeline() - .get(PartitionRequestClientHandler.class); + NetworkClientHandler clientHandler = channel.pipeline().get(PartitionRequestClientHandler.class); + if (clientHandler == null) { + clientHandler = channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class); + } --- End diff -- if you let `NetworkClientHandler` extend from `ChannelHandler`, then this can be simplified to ``` NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class); ``` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328747#comment-16328747 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r162047076 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- Do you, by any chance, know why 800 worked and is not enough here anymore? I mean, why/how does this test need 800 buffers? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328393#comment-16328393 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , I have submitted the switch for keeping the old mode and the new credit-based mode. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326834#comment-16326834 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , thanks for your reviews! I have submitted all the patches you provided offline to address above issues. 1. Remove `FLINK-8425` from this PR. 2. Do you think I should add more tests for `nextBufferIsEvent`? Because I already verified that in previous related tests 3. For adding the switch issue, I found some difficulties to leave messages for you offline. We can further confirm that. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326793#comment-16326793 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161667346 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- yes, the same reason as above > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326792#comment-16326792 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161667234 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java --- @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16); --- End diff -- yes, i will set 9 for it. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326451#comment-16326451 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4552 one thing which we talked about offline: as a precaution, we should keep the old implementation around and allow the users to basically turn the credit-based flow control algorithm on/off (the accounting for the credits would mostly stay in that case but will simple not be used by the old non-existing flow control) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326442#comment-16326442 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559926 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326430#comment-16326430 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161485403 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -434,6 +443,29 @@ private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) t UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); } + /** +* Creates and returns a remote input channel for the specific input gate with specific partition request client. +* +* @param inputGate The input gate owns the created input channel. +* @param client The client is used to send partition request. +* @return The new created remote input channel. +*/ + private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, PartitionRequestClient client) throws Exception { --- End diff -- could you modify `PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` to rely on this method, i.e. `return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class));`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326439#comment-16326439 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161565896 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326435#comment-16326435 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559690 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326448#comment-16326448 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161578518 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- Is that also the reason here? I see that otherwise we get into `Insufficient number of network buffers` but it does not look as if it was configured as tightly... (just want to rule out some memory leak with the new code) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326443#comment-16326443 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161568012 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result + assertFalse(view.nextBufferIsEvent()); --- End diff -- we should test this everywhere we access `getNextBuffer()` or add buffers via `add()` - also if `getNextBuffer()` is `null` or before even requesting anything > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326433#comment-16326433 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559642 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326436#comment-16326436 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161547041 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even --- End diff -- nit: `is an event` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326440#comment-16326440 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559913 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326437#comment-16326437 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161565565 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); --- End diff -- actually, let's use `assertThat(queue.getAvailableReaders(), contains(reader));` here which gives much nicer output in case something is wrong > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326444#comment-16326444 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161569135 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + assertFalse(view.nextBufferIsEvent()); + read = view.getNextBuffer(); + assertNotNull(read); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); + assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); + // Add event to the queue... Buffer event = createBuffer(); event.tagAsEvent(); subpartition.add(event); + assertTrue(view.nextBufferIsEvent()); assertEquals(3, subpartition.getTotalNumberOfBuffers()); - assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(3)).notifyBuffersAvailable(eq(1L)); } --- End diff -- maybe verify that `nextBufferIsEvent()` returns the right thing after adding a real buffer now (with the event being next) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326445#comment-16326445 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161573976 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(4L)); + assertFalse(reader.nextBufferIsEvent()); --- End diff -- also test `read.buffer().isBuffer()`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326431#comment-16326431 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161545040 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws Exception { handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse1); handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse2); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel1); - handler.notifyCreditAvailable(inputChannel2); - assertEquals(2, inputChannel1.getUnannouncedCredit()); assertEquals(2, inputChannel2.getUnannouncedCredit()); channel.runPendingTasks(); - // The two input channels should notify credits via writable channel + // The two input channels should send partition requests and then notify credits via writable channel assertTrue(channel.isWritable()); Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(inputChannel1.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); + + readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(inputChannel2.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); --- End diff -- Let's verify those two `PartitionRequest` messages above since `inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if we change the `initialCredit` to be included in the `unannouncedCredit`). > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326447#comment-16326447 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161576253 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java --- @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16); --- End diff -- just a note for the curious: this test can cope with higher number of network buffers and is waiting for all of them to be blocked - increasing this to `9` would have been enough here though (we require 2 exclusive buffers now per default, while 1 was the minimum per incoming channel) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326446#comment-16326446 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161570121 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result + assertFalse(view.nextBufferIsEvent()); --- End diff -- also test `read.buffer().isBuffer()` then? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326432#comment-16326432 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161546199 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { assertEquals(2, inputChannel.getUnannouncedCredit()); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel); - // Release the input channel inputGate.releaseAllResources(); channel.runPendingTasks(); - // It will not notify credits for released input channel + // It should send partition request first, and send close request after releasing input channel, + // but will not notify credits for released input channel. + Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); + readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(CloseRequest.class)); --- End diff -- put these after `inputGate.releaseAllResources()`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326438#comment-16326438 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567331 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326434#comment-16326434 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161546145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { assertEquals(2, inputChannel.getUnannouncedCredit()); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel); - // Release the input channel inputGate.releaseAllResources(); channel.runPendingTasks(); - // It will not notify credits for released input channel + // It should send partition request first, and send close request after releasing input channel, + // but will not notify credits for released input channel. + Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); --- End diff -- similar here: verify `PartitionRequest` after `inputChannel.requestSubpartition` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326441#comment-16326441 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567305 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); --- End diff -- let's end with `assertNull(channel.readOutbound());` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326275#comment-16326275 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , I have submitted all the modifications based on the patches you provided. The tests for `nextBufferIsEvent` will be added in a new commit tomorrow. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16325989#comment-16325989 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161450876 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -199,6 +199,19 @@ public boolean isReleased() { } } + @Override + public boolean nextBufferIsEvent() { + if (nextBuffer != null) { + return !nextBuffer.isBuffer(); + } --- End diff -- agree with integration > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324469#comment-16324469 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161248402 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -52,4 +52,9 @@ boolean isReleased(); Throwable getFailureCause(); + + /** +* Returns whether the next buffer is event or not. --- End diff -- `is an event` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324471#comment-16324471 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161296711 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -73,6 +73,9 @@ /** Flag indicating whether all resources have been released. */ private AtomicBoolean isReleased = new AtomicBoolean(); + /** The next buffer to hand out. */ + private Buffer nextBuffer; --- End diff -- We need to protect this against race conditions with respect to `releaseAllResources()` as well. Actually, I'm surprised that nothing in this class is protected against concurrently releasing it. Although I have created a separate issue for this ([FLINK-8425](https://issues.apache.org/jira/browse/FLINK-8425)), I think we may need to solve this here for the new `nextBuffer` anyway. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324467#comment-16324467 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161295951 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -199,6 +199,19 @@ public boolean isReleased() { } } + @Override + public boolean nextBufferIsEvent() { + if (nextBuffer != null) { + return !nextBuffer.isBuffer(); + } + + if (spilledView != null) { --- End diff -- `checkState(spilledView != null, "No in-memory buffers available, but also nothing spilled.");` just like in `getNextBuffer()`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324472#comment-16324472 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161249082 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -199,6 +199,19 @@ public boolean isReleased() { } } + @Override + public boolean nextBufferIsEvent() { + if (nextBuffer != null) { + return !nextBuffer.isBuffer(); + } --- End diff -- We probably need synchronization here to access `nextBuffer` and also check for `isReleased()` similar to `getNextBuffer`, since we are basically doing the same but without taking the buffer. Why not integrate this into `getNextBuffer` and the `BufferAndBacklog` returned there? Inside that method, gathering this additional info is basically for free and we may thus speed up some code paths. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324466#comment-16324466 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161295428 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -163,6 +190,15 @@ public boolean isReleased() { return parent.isReleased() || isReleased.get(); } + @Override + public boolean nextBufferIsEvent() { + if (nextBuffer != null) { + return !nextBuffer.isBuffer(); + } + + return false; --- End diff -- I'm afraid relying on `nextBuffer` won't be enough because it might not have been there during `getNextBuffer()` but may be there now. Please add a test for this case. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324465#comment-16324465 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161247589 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,6 +92,46 @@ void requestSubpartitionView( } } + /** +* The credits from consumer are added in incremental way. +* +* @param creditDeltas The credit deltas +*/ + public void addCredit(int creditDeltas) { + numCreditsAvailable += creditDeltas; + } + + /** +* Updates the value to indicate whether the reader is enqueued in the pipeline or not. +* +* @param isRegisteredAvailable True if this reader is already enqueued in the pipeline. +*/ + public void notifyAvailabilityChanged(boolean isRegisteredAvailable) { + this.isRegisteredAvailable = isRegisteredAvailable; + } + + public boolean isRegisteredAvailable() { + return isRegisteredAvailable; + } + + /** +* Check whether this reader is available or not. +* +* Return true only if the next buffer is event or the reader has both available +* credits and buffers. +*/ + public boolean isAvailable() { + if (numBuffersAvailable.get() <= 0) { + return false; + } + + if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) { --- End diff -- how about checking `numCreditsAvailable` first since that's the cheaper check? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324473#comment-16324473 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161287607 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,6 +92,46 @@ void requestSubpartitionView( } } + /** +* The credits from consumer are added in incremental way. +* +* @param creditDeltas The credit deltas +*/ + public void addCredit(int creditDeltas) { + numCreditsAvailable += creditDeltas; + } + + /** +* Updates the value to indicate whether the reader is enqueued in the pipeline or not. +* +* @param isRegisteredAvailable True if this reader is already enqueued in the pipeline. +*/ + public void notifyAvailabilityChanged(boolean isRegisteredAvailable) { + this.isRegisteredAvailable = isRegisteredAvailable; + } + + public boolean isRegisteredAvailable() { + return isRegisteredAvailable; + } + + /** +* Check whether this reader is available or not. +* +* Return true only if the next buffer is event or the reader has both available +* credits and buffers. +*/ + public boolean isAvailable() { + if (numBuffersAvailable.get() <= 0) { + return false; + } + + if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) { --- End diff -- one more thing: in the special case of using it inside `getNextBuffer()`, we already retrieved the `remaining` number of buffers so we can spare one lookup into an atomic integer here by passing this one in > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324470#comment-16324470 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161282684 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,6 +92,46 @@ void requestSubpartitionView( } } + /** +* The credits from consumer are added in incremental way. +* +* @param creditDeltas The credit deltas +*/ + public void addCredit(int creditDeltas) { + numCreditsAvailable += creditDeltas; + } + + /** +* Updates the value to indicate whether the reader is enqueued in the pipeline or not. +* +* @param isRegisteredAvailable True if this reader is already enqueued in the pipeline. +*/ + public void notifyAvailabilityChanged(boolean isRegisteredAvailable) { + this.isRegisteredAvailable = isRegisteredAvailable; + } + + public boolean isRegisteredAvailable() { + return isRegisteredAvailable; + } + + /** +* Check whether this reader is available or not. +* +* Return true only if the next buffer is event or the reader has both available +* credits and buffers. +*/ + public boolean isAvailable() { + if (numBuffersAvailable.get() <= 0) { + return false; + } + + if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) { --- End diff -- also, why not simply do the following for this whole method? ``` return numBuffersAvailable.get() > 0 && (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); ``` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324464#comment-16324464 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161241148 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -49,10 +50,24 @@ private volatile ResultSubpartitionView subpartitionView; + /** +* The status indicating whether this reader is already enqueued in the pipeline for transferring +* data or not. It is mainly used for avoid registering this reader to the pipeline repeatedly. +*/ + private boolean isRegisteredAvailable; + + /** The number of available buffers for holding data on the consumer side. */ + private int numCreditsAvailable; --- End diff -- Just a note since I was wondering whether we need synchronization here (not needed after verifying the things below): 1) `numCreditsAvailable` is increased via `PartitionRequestServerHandler#channelRead0` which is a separate channel handler than `PartitionRequestQueue` (see `NettyProtocol#getServerChannelHandlers`). According to [Netty's thread model](https://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h2-34), we should be safe though: > A user can specify an EventExecutor when he or she adds a handler to a ChannelPipeline. > - If specified, the handler methods of the ChannelHandler are always invoked by the specified EventExecutor. > - If unspecified, the handler methods are always invoked by the EventLoop that its associated Channel is registered to. 2) `numCreditsAvailable` is read from `PartitionRequestQueue#enqueueAvailableReader()` and `SequenceNumberingViewReader#getNextBuffer()` which are both accessed by the channel's IO thread only. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324468#comment-16324468 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161242036 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -49,10 +50,24 @@ private volatile ResultSubpartitionView subpartitionView; + /** +* The status indicating whether this reader is already enqueued in the pipeline for transferring +* data or not. It is mainly used for avoid registering this reader to the pipeline repeatedly. +*/ + private boolean isRegisteredAvailable; --- End diff -- I know it's default-initialized to false but let's make this explicit > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321558#comment-16321558 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160849525 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -250,10 +304,12 @@ private void handleException(Channel channel, Throwable cause) throws IOExceptio private void releaseAllResources() throws IOException { SequenceNumberingViewReader reader; - while ((reader = nonEmptyReader.poll()) != null) { + while ((reader = availableReaders.poll()) != null) { --- End diff -- yes, it should release all readers here. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321557#comment-16321557 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160849478 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +94,37 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification from the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + @VisibleForTesting + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + @VisibleForTesting + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- agree > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320522#comment-16320522 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160695373 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -98,20 +135,35 @@ public void close() { } } + /** +* Adds to maintain the unannounced credits from the consumer and it may trigger +* enqueue the corresponding reader for this consumer transferring data. +* +* @param receiverId The input channel id to identify consumer. +* @param credit The unannounced credits of the consumer. +*/ + public void addCredit(InputChannelID receiverId, int credit) throws Exception { --- End diff -- could be `package-private` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320521#comment-16320521 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160694722 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +94,37 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification from the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + @VisibleForTesting + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + @VisibleForTesting + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- actually, we can inline this into the first one, make this `private`, and reduce `Mockito` use in the tests if we just made the `availableReaders` field accessible to the tests, e.g. ``` /** * Accesses internal state to verify reader registration in the unit tests. * * Do not use anywhere else! * * @return readers which are enqueued available for transferring data */ @VisibleForTesting ArrayDeque getAvailableReaders() { return availableReaders; } ``` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320520#comment-16320520 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160714424 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -250,10 +304,12 @@ private void handleException(Channel channel, Throwable cause) throws IOExceptio private void releaseAllResources() throws IOException { SequenceNumberingViewReader reader; - while ((reader = nonEmptyReader.poll()) != null) { + while ((reader = availableReaders.poll()) != null) { --- End diff -- Previously, we did not have access to all views but only those with data, but shouldn't we release all views instead? Especially now since there are views with data but with not enough credit. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318232#comment-16318232 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160371761 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- Thanks for clarification. I misunderstood this tag from the comments, I will modify it after all the reviews together. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318166#comment-16318166 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160361957 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- I interpret this tag actually not only for methods that are *only* used by testing, but for those where we had to increase visibility, e.g. from `private` to `package-private` as here, to be usable in tests. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317972#comment-16317972 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160333139 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- The same with `triggerEnqueueAvailableReader` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317971#comment-16317971 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r160333068 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- This method is not used only for testing. It may be called via `addCredit` method. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315833#comment-16315833 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r159400371 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java --- @@ -77,6 +83,37 @@ void requestSubpartitionView( } } + public void addCredit(int credit) { + numCreditsAvailable += credit; + } + + public void notifyAvailableChanged(boolean update) { --- End diff -- `notifyAvailabilityChanged` or actually rather `setAvailability`? also, please add a comment on what this is about / what "availability" means here > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315838#comment-16315838 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r159399376 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving --- End diff -- `from` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315831#comment-16315831 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r157760366 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -41,30 +42,33 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.ArrayDeque; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Channel handler to read the messages of buffer response or error response from the + * producer, to write and flush the unannounced credits for the producer. + */ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class); - private final ConcurrentMapinputChannels = new ConcurrentHashMap (); - - private final AtomicReference channelError = new AtomicReference(); + /** Channels, which already requested partitions from the producers. */ + private final ConcurrentMap inputChannels = new ConcurrentHashMap<>(); - private final BufferListenerTask bufferListener = new BufferListenerTask(); + /** Channels, which will notify the producers about unannounced credit. */ + private final ArrayDeque inputChannelsWithCredit = new ArrayDeque<>(); - private final Queue stagedMessages = new ArrayDeque(); + private final AtomicReference channelError = new AtomicReference<>(); - private final StagedMessagesHandlerTask stagedMessagesHandler = new StagedMessagesHandlerTask(); + private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener(); /** * Set of cancelled partition requests. A request is cancelled iff an input channel is cleared * while data is still coming in for this channel. */ - private final ConcurrentMap cancelled = Maps.newConcurrentMap(); + private final ConcurrentMap cancelled = new ConcurrentHashMap<>(); private volatile ChannelHandlerContext ctx; --- End diff -- it looks like you missed to migrate some of the comments that were present in `CreditBasedClientHandler` but are not present here, e.g. for `ctx` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315836#comment-16315836 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r159401061 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- `@VisibleForTesting`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315834#comment-16315834 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r157760560 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -158,32 +164,44 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { - if (!bufferListener.hasStagedBufferOrEvent() && stagedMessages.isEmpty()) { - decodeMsg(msg, false); - } - else { - stagedMessages.add(msg); - } - } - catch (Throwable t) { + decodeMsg(msg); + } catch (Throwable t) { notifyAllChannelsOfErrorAndClose(t); } } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof RemoteInputChannel) { + boolean triggerWrite = inputChannelsWithCredit.isEmpty(); --- End diff -- please re-add the comment here, too > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315837#comment-16315837 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r159400956 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -88,6 +93,35 @@ public void run() { }); } + /** +* Try to enqueue the reader once receiving credit notification form the consumer or receiving +* non-empty reader notification from the producer. Only one thread would trigger the actual +* enqueue after checking the reader's availability, so there is no race condition here. +*/ + void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { + if (!reader.isRegisteredAvailable() && reader.isAvailable()) { + enqueueAvailableReader(reader); + } + } + + void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception { --- End diff -- `@VisibleForTesting`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315832#comment-16315832 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r157760482 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -94,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) { } } + void notifyCreditAvailable(final RemoteInputChannel inputChannel) { --- End diff -- please re-add the comment here, too > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315835#comment-16315835 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r157760652 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -318,192 +307,56 @@ else if (bufferProvider.isDestroyed()) { MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray); Buffer buffer = new Buffer(memSeg, FreeingBufferRecycler.INSTANCE, false); - inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1); - - return true; - } - } - finally { - if (releaseNettyBuffer) { - bufferOrEvent.releaseBuffer(); + inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); } + } finally { + bufferOrEvent.releaseBuffer(); } } - /** -* This class would be replaced by CreditBasedClientHandler in the final, -* so we only implement this method in CreditBasedClientHandler. -*/ - void notifyCreditAvailable(RemoteInputChannel inputChannel) { - } - - private class AsyncErrorNotificationTask implements Runnable { - - private final Throwable error; - - public AsyncErrorNotificationTask(Throwable error) { - this.error = error; - } - - @Override - public void run() { - notifyAllChannelsOfErrorAndClose(error); - } - } - - /** -* A buffer availability listener, which subscribes/unsubscribes the NIO -* read event. -* -* If no buffer is available, the channel read event will be unsubscribed -* until one becomes available again. -* -* After a buffer becomes available again, the buffer is handed over by -* the thread calling {@link #notifyBufferAvailable(Buffer)} to the network I/O -* thread, which then continues the processing of the staged buffer. -*/ - private class BufferListenerTask implements BufferListener, Runnable { - - private final AtomicReference availableBuffer = new AtomicReference(); - - private NettyMessage.BufferResponse stagedBufferResponse; - - private boolean waitForBuffer(BufferProvider bufferProvider, NettyMessage.BufferResponse bufferResponse) { - - stagedBufferResponse = bufferResponse; - - if (bufferProvider.addBufferListener(this)) { - if (ctx.channel().config().isAutoRead()) { - ctx.channel().config().setAutoRead(false); - } - - return true; - } - else { - stagedBufferResponse = null; - - return false; - } - } - - private boolean hasStagedBufferOrEvent() { - return stagedBufferResponse != null; - } - - public void notifyBufferDestroyed() { - // The buffer pool has been destroyed - stagedBufferResponse = null; - - if (stagedMessages.isEmpty()) { - ctx.channel().config().setAutoRead(true); - ctx.channel().read(); - } - else { - ctx.channel().eventLoop().execute(stagedMessagesHandler); - } + private void writeAndFlushNextMessageIfPossible(Channel channel) { --- End diff -- please re-add the comment here, too > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16274235#comment-16274235 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , I have rebased the latest codes. Wish your reviews! > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270509#comment-16270509 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 Yes, I am planing to rebase this with the latest changes of previous commits. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270469#comment-16270469 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4552 Ok, I think, I'll manage the review without the split. Since this is the last of the credit-based PRs though, can you rebase on top of the latest changes (preferably after addressing the comments in the other PRs)? This way, I'll have the full picture of this crucial change. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268222#comment-16268222 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r153405989 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - if (gate.getConsumedPartitionType().isCreditBased()) { - // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly - bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - } else { - int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - gate.getNumberOfInputChannels() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), - maxNumberOfMemorySegments); - } + // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly + bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); + gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- That makes sense, I will keep the previous behaviour for batch processing. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268209#comment-16268209 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , thanks for focusing on the last PR. I am supposed to divide this into two separate ones as you said. But I am afraid some current tests may fail if I only modify and enable the credit-based process on sender side, otherwise I may need to create a temporary handler in parallel with `PartitionRequestQueue`. As you said, the latter mainly replaces the `PartitionRequestClientHandler` with the contents of `CreditBasedClientHandler`, also it replaces the `CreditBasedClientHandler` class name in previous added tests and remove previous temporary codes in `ResultPartitionType` to make fixed buffer pool work. If it still brings difficulty for your review, then I will try to divide it then. : ) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266984#comment-16266984 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r152985049 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException { BufferPool bufferPool = null; try { - if (gate.getConsumedPartitionType().isCreditBased()) { - // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly - bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - } else { - int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - gate.getNumberOfInputChannels() * networkBuffersPerChannel + - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), - maxNumberOfMemorySegments); - } + // Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly + bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate); + gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- What about the non-bounded partition type that we use for batch processing? Shouldn't we use an unbounded number of floating buffers there, as previously? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203810#comment-16203810 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @pnowojski , this PR is ready for review. It covers almost all the logics of credit-based on sender side. In addition, I replace the current `PartitionRequestClientHandler` with `CreditBasedClientHandler` and remove previous temporary codes for making this feature work on both sides. It leaves a small work to do in this PR related with `SpilledSubpartitionView#nextBufferIsEvent` because the existing process in spilled sub-partition can not get next buffer directly. But the current default value for `nextBufferIsEvent`` will not affect the core process, only results in wasting a unnecessary credit, then I will try to solve it in a lightweight way later. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16129035#comment-16129035 ] ASF GitHub Bot commented on FLINK-7456: --- GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/4552 [FLINK-7456][network]Implement Netty sender incoming pipeline for credit-based ## What is the purpose of the change This PR is based on #4533 whose commits are also included for passing travis. Review the last commit for this PR change. On sender side, it maintains credit from receiver's `PartitionRequest` and `AddCredit` messages, then sends buffer based on credit and network capacity. This PR is mainly involved in incoming pipeline logic for credit-based. ## Brief change log - *Each subpartition view maintains current credit and a boolean field to mark whether it is already registered available for transfer* - *Update current credit in processing `PartitionRequest` and `AddCredit` messages* - *The mechanism of enqueue the subpartition view and update the registered status field* ## Verifying this change This change added tests and can be verified as follows: - *Added test to verify that current credit is updated correctly and subpartition view is enqueued when received `AddCredit` message* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-7456 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4552.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4552 commit e35c1ff8066bf44344495d132a1092b9db3ef182 Author: ZhijiangDate: 2017-08-07T09:31:17Z [FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for the floating buffers commit 969c24d3bf80c1ff89ada11e81b9bf4fea14066f Author: Zhijiang Date: 2017-08-14T06:30:47Z [FLINK-7394][core]Implement basic InputChannel with free buffers,credit and backlog commit 15fa828449d73f53042c57e9c5494d75ddee575f Author: Zhijiang Date: 2017-08-10T05:29:13Z [FLINK-7406][network]Implement Netty receiver incoming pipeline for credit-based commit d0674244f15701863a5dd3f68b7274b3bd49c64d Author: Zhijiang Date: 2017-08-12T14:13:25Z [FLINK-7416][network] Implement Netty receiver outgoing pipeline for credit-based commit 6eaff7877ad43eab674e184153365b50ec8e1559 Author: Zhijiang Date: 2017-08-16T13:24:53Z [FLINK-7456][network]Implement Netty sender incoming pipeline for credit-based > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)