[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366948#comment-16366948
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4552


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332062#comment-16332062
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4552
  
looks good - Let's start some cluster tests and then we're ready to merge


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330427#comment-16330427
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162323707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -31,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
--- End diff --

no, you don't need to - I can create a separate PR for FLINK-8225


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330186#comment-16330186
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162266243
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -31,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
--- End diff --

Should I pick the FLINK-8225 in this PR?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330155#comment-16330155
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
1. Thanks for you FLINK08425.
2. I would have thought the tests for 
`ResultSubpartition#nextBufferIsEvent` which have already been covered before. 
The test for `BufferAndBacklog#nextBufferIsEvent()` is not included before, and 
thanks for providing the patch for it.
3. I will create a separate JIRA for the switch and also include the commit 
in this PR. And check the travis fail.
4. I will add the comment for the document you mentioned later.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330126#comment-16330126
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ---
@@ -164,11 +165,13 @@ private boolean dispose() {
private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
-   PartitionRequestClientHandler 
requestHandler = channel.pipeline()
-   
.get(PartitionRequestClientHandler.class);
+   NetworkClientHandler clientHandler = 
channel.pipeline().get(PartitionRequestClientHandler.class);
+   if (clientHandler == null) {
+   clientHandler = 
channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class);
+   }
--- End diff --

good idea!


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330123#comment-16330123
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259728
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 ---
@@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, 
NettyMessage msg) throws
LOG.debug("Read channel on {}: {}.", 
ctx.channel().localAddress(), request);
 
try {
-   SequenceNumberingViewReader reader = 
new SequenceNumberingViewReader(
-   request.receiverId,
-   request.credit,
-   outboundQueue);
+   NetworkSequenceViewReader reader;
+   if (request.credit > 0) {
+   reader = new 
CreditBasedSequenceNumberingViewReader(
+   request.receiverId,
+   request.credit,
+   outboundQueue);
+   } else {
+   reader = new 
SequenceNumberingViewReader(
+   request.receiverId,
+   outboundQueue);
+   }
--- End diff --

Yes, I actually took a hacky way to realize it for easy. :)
I will consider whether it is feasible to get the config here. If so I will 
take the config way, otherwise I may add a comment clarifying this.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16330125#comment-16330125
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259766
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -125,11 +126,11 @@ private void enqueueAvailableReader(final 
SequenceNumberingViewReader reader) th
 * @return readers which are enqueued available for transferring data
 */
@VisibleForTesting
-   ArrayDeque getAvailableReaders() {
+   ArrayDeque getAvailableReaders() {
return availableReaders;
}
 
-   void notifyReaderCreated(final SequenceNumberingViewReader reader) {
+   public void notifyReaderCreated(final NetworkSequenceViewReader reader) 
{
--- End diff --

yes, for my careless


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328972#comment-16328972
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162104968
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -125,11 +126,11 @@ private void enqueueAvailableReader(final 
SequenceNumberingViewReader reader) th
 * @return readers which are enqueued available for transferring data
 */
@VisibleForTesting
-   ArrayDeque getAvailableReaders() {
+   ArrayDeque getAvailableReaders() {
return availableReaders;
}
 
-   void notifyReaderCreated(final SequenceNumberingViewReader reader) {
+   public void notifyReaderCreated(final NetworkSequenceViewReader reader) 
{
--- End diff --

this could stay `package-private`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328971#comment-16328971
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162106371
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 ---
@@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, 
NettyMessage msg) throws
LOG.debug("Read channel on {}: {}.", 
ctx.channel().localAddress(), request);
 
try {
-   SequenceNumberingViewReader reader = 
new SequenceNumberingViewReader(
-   request.receiverId,
-   request.credit,
-   outboundQueue);
+   NetworkSequenceViewReader reader;
+   if (request.credit > 0) {
+   reader = new 
CreditBasedSequenceNumberingViewReader(
+   request.receiverId,
+   request.credit,
+   outboundQueue);
+   } else {
+   reader = new 
SequenceNumberingViewReader(
+   request.receiverId,
+   outboundQueue);
+   }
--- End diff --

This seems a bit hacky since it does not rely on the configuration 
parameter directly but rather on the affect it has, i.e. that a 
`PartitionRequest` in credit-based flow control always has a non-zero credit 
(due to `NetworkBufferPool#requestMemorySegments()`).
Relying on the configuration parameter would be nicer, but alternatively, 
you could add a comment clarifying this.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328973#comment-16328973
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162107348
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -31,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
--- End diff --

also found that relic of FLINK-8225 (removed in my patch for the previous 
commit, and it should be inside FLINK-7456, not the hotfix which we will revert 
later)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328974#comment-16328974
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162103769
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ---
@@ -164,11 +165,13 @@ private boolean dispose() {
private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
-   PartitionRequestClientHandler 
requestHandler = channel.pipeline()
-   
.get(PartitionRequestClientHandler.class);
+   NetworkClientHandler clientHandler = 
channel.pipeline().get(PartitionRequestClientHandler.class);
+   if (clientHandler == null) {
+   clientHandler = 
channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class);
+   }
--- End diff --

if you let `NetworkClientHandler` extend from `ChannelHandler`, then this 
can be simplified to
```
NetworkClientHandler clientHandler = 
channel.pipeline().get(NetworkClientHandler.class);
```


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328747#comment-16328747
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162047076
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

Do you, by any chance, know why 800 worked and is not enough here anymore? 
I mean, why/how does this test need 800 buffers?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328393#comment-16328393
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have submitted the switch for keeping the old mode and the new 
credit-based mode.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326834#comment-16326834
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , thanks for your reviews! 
I have submitted all the patches you provided offline to address above 
issues.

1. Remove `FLINK-8425` from this PR.
2. Do you think I should add more tests for `nextBufferIsEvent`? Because I 
already verified that in previous related tests
3. For adding the switch issue, I found some difficulties to leave messages 
for you offline. We can further confirm that.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326793#comment-16326793
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667346
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

yes, the same reason as above


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326792#comment-16326792
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667234
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

yes, i will set 9 for it.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326451#comment-16326451
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4552
  
one thing which we talked about offline: as a precaution, we should keep 
the old implementation around and allow the users to basically turn the 
credit-based flow control algorithm on/off (the accounting for the credits 
would mostly stay in that case but will simple not be used by the old 
non-existing flow control)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326442#comment-16326442
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559926
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = spy(new 
PartitionRequestQueue());
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
+   // The reader is enqueued in the pipeline because the next 
buffer is event, even though no available credits
+   verify(queue, times(1)).enqueueAvailableReader(reader);
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --

let's remove that mock ...


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326430#comment-16326430
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161485403
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -434,6 +443,29 @@ private RemoteInputChannel 
createRemoteInputChannel(SingleInputGate inputGate) t

UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
}
 
+   /**
+* Creates and returns a remote input channel for the specific input 
gate with specific partition request client.
+*
+* @param inputGate The input gate owns the created input channel.
+* @param client The client is used to send partition request.
+* @return The new created remote input channel.
+*/
+   private RemoteInputChannel createRemoteInputChannel(SingleInputGate 
inputGate, PartitionRequestClient client) throws Exception {
--- End diff --

could you modify 
`PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` 
to rely on this method, i.e. `return createRemoteInputChannel(inputGate, 
mock(PartitionRequestClient.class));`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326439#comment-16326439
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161565896
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+   queue.notifyReaderCreated(reader);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify available 

[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326435#comment-16326435
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559690
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --

let's remove that mock ...


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326448#comment-16326448
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161578518
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

Is that also the reason here? I see that otherwise we get into 
`Insufficient number of network buffers` but it does not look as if it was 
configured as tightly...
(just want to rule out some memory leak with the new code)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326443#comment-16326443
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161568012
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
+   assertFalse(view.nextBufferIsEvent());
--- End diff --

we should test this everywhere we access `getNextBuffer()` or add buffers 
via `add()` - also if `getNextBuffer()` is `null` or before even requesting 
anything


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326433#comment-16326433
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559642
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
--- End diff --

let's remove that mock ...


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326436#comment-16326436
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161547041
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
--- End diff --

nit: `is an event`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326440#comment-16326440
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559913
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = spy(new 
PartitionRequestQueue());
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
+   // The reader is enqueued in the pipeline because the next 
buffer is event, even though no available credits
+   verify(queue, times(1)).enqueueAvailableReader(reader);
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
--- End diff --

let's remove that mock ...


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326437#comment-16326437
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161565565
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
--- End diff --

actually, let's use `assertThat(queue.getAvailableReaders(), 
contains(reader));` here which gives much nicer output in case something is 
wrong


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326444#comment-16326444
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161569135
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
 
+   assertFalse(view.nextBufferIsEvent());
+   read = view.getNextBuffer();
+   assertNotNull(read);
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
+   assertNull(view.getNextBuffer());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+
// Add event to the queue...
Buffer event = createBuffer();
event.tagAsEvent();
subpartition.add(event);
 
+   assertTrue(view.nextBufferIsEvent());
assertEquals(3, subpartition.getTotalNumberOfBuffers());
-   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(0, subpartition.getBuffersInBacklog());
assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(3)).notifyBuffersAvailable(eq(1L));
}
--- End diff --

maybe verify that `nextBufferIsEvent()` returns the right thing after 
adding a real buffer now (with the event being next)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326445#comment-16326445
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161573976
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
+   assertFalse(reader.nextBufferIsEvent());
--- End diff --

also test `read.buffer().isBuffer()`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326431#comment-16326431
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161545040
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws 
Exception {
handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse1);
handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse2);
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel1);
-   handler.notifyCreditAvailable(inputChannel2);
-
assertEquals(2, inputChannel1.getUnannouncedCredit());
assertEquals(2, inputChannel2.getUnannouncedCredit());
 
channel.runPendingTasks();
 
-   // The two input channels should notify credits via 
writable channel
+   // The two input channels should send partition 
requests and then notify credits via writable channel
assertTrue(channel.isWritable());
Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(inputChannel1.getInputChannelId(), 
((PartitionRequest) readFromOutbound).receiverId);
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
+
+   readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(inputChannel2.getInputChannelId(), 
((PartitionRequest) readFromOutbound).receiverId);
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
--- End diff --

Let's verify those two `PartitionRequest` messages above since 
`inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if 
we change the `initialCredit` to be included in the `unannouncedCredit`).


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326447#comment-16326447
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161576253
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

just a note for the curious:  this test can cope with higher number of 
network buffers and is waiting for all of them to be blocked - increasing this 
to `9` would have been enough here though (we require 2 exclusive buffers now 
per default, while 1 was the minimum per incoming channel)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326446#comment-16326446
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161570121
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
+   assertFalse(view.nextBufferIsEvent());
--- End diff --

also test `read.buffer().isBuffer()` then?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326432#comment-16326432
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161546199
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() 
throws Exception {
 
assertEquals(2, inputChannel.getUnannouncedCredit());
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel);
-
// Release the input channel
inputGate.releaseAllResources();
 
channel.runPendingTasks();
 
-   // It will not notify credits for released input channel
+   // It should send partition request first, and send 
close request after releasing input channel,
+   // but will not notify credits for released input 
channel.
+   Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
+   readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(CloseRequest.class));
--- End diff --

put these after `inputGate.releaseAllResources()`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326438#comment-16326438
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161567331
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+   queue.notifyReaderCreated(reader);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify available 

[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326434#comment-16326434
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161546145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() 
throws Exception {
 
assertEquals(2, inputChannel.getUnannouncedCredit());
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel);
-
// Release the input channel
inputGate.releaseAllResources();
 
channel.runPendingTasks();
 
-   // It will not notify credits for released input channel
+   // It should send partition request first, and send 
close request after releasing input channel,
+   // but will not notify credits for released input 
channel.
+   Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
--- End diff --

similar here: verify `PartitionRequest` after 
`inputChannel.requestSubpartition`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326441#comment-16326441
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161567305
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
--- End diff --

let's end with `assertNull(channel.readOutbound());`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326275#comment-16326275
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have submitted all the modifications based on the patches you 
provided.
The tests for `nextBufferIsEvent` will be added in a new commit tomorrow.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16325989#comment-16325989
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161450876
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -199,6 +199,19 @@ public boolean isReleased() {
}
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
--- End diff --

agree with integration


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324469#comment-16324469
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161248402
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -52,4 +52,9 @@
boolean isReleased();
 
Throwable getFailureCause();
+
+   /**
+* Returns whether the next buffer is event or not.
--- End diff --

`is an event`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324471#comment-16324471
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161296711
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -73,6 +73,9 @@
/** Flag indicating whether all resources have been released. */
private AtomicBoolean isReleased = new AtomicBoolean();
 
+   /** The next buffer to hand out. */
+   private Buffer nextBuffer;
--- End diff --

We need to protect this against race conditions with respect to 
`releaseAllResources()` as well.

Actually, I'm surprised that nothing in this class is protected against 
concurrently releasing it. Although I have created a separate issue for this 
([FLINK-8425](https://issues.apache.org/jira/browse/FLINK-8425)), I think we 
may need to solve this here for the new `nextBuffer` anyway.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324467#comment-16324467
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161295951
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -199,6 +199,19 @@ public boolean isReleased() {
}
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
+
+   if (spilledView != null) {
--- End diff --

`checkState(spilledView != null, "No in-memory buffers available, but also 
nothing spilled.");` just like in `getNextBuffer()`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324472#comment-16324472
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161249082
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -199,6 +199,19 @@ public boolean isReleased() {
}
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
--- End diff --

We probably need synchronization here to access `nextBuffer` and also check 
for `isReleased()` similar to `getNextBuffer`, since we are basically doing the 
same but without taking the buffer.

Why not integrate this into `getNextBuffer` and the `BufferAndBacklog` 
returned there? Inside that method, gathering this additional info is basically 
for free and we may thus speed up some code paths.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324466#comment-16324466
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161295428
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -163,6 +190,15 @@ public boolean isReleased() {
return parent.isReleased() || isReleased.get();
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
+
+   return false;
--- End diff --

I'm afraid relying on `nextBuffer` won't be enough because it might not 
have been there during `getNextBuffer()` but may be there now.

Please add a test for this case.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324465#comment-16324465
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161247589
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -77,6 +92,46 @@ void requestSubpartitionView(
}
}
 
+   /**
+* The credits from consumer are added in incremental way.
+*
+* @param creditDeltas The credit deltas
+*/
+   public void addCredit(int creditDeltas) {
+   numCreditsAvailable += creditDeltas;
+   }
+
+   /**
+* Updates the value to indicate whether the reader is enqueued in the 
pipeline or not.
+*
+* @param isRegisteredAvailable True if this reader is already enqueued 
in the pipeline.
+*/
+   public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
+   this.isRegisteredAvailable = isRegisteredAvailable;
+   }
+
+   public boolean isRegisteredAvailable() {
+   return isRegisteredAvailable;
+   }
+
+   /**
+* Check whether this reader is available or not.
+*
+* Return true only if the next buffer is event or the reader has 
both available
+* credits and buffers.
+*/
+   public boolean isAvailable() {
+   if (numBuffersAvailable.get() <= 0) {
+   return false;
+   }
+
+   if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable 
> 0) {
--- End diff --

how about checking `numCreditsAvailable` first since that's the cheaper 
check?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324473#comment-16324473
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161287607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -77,6 +92,46 @@ void requestSubpartitionView(
}
}
 
+   /**
+* The credits from consumer are added in incremental way.
+*
+* @param creditDeltas The credit deltas
+*/
+   public void addCredit(int creditDeltas) {
+   numCreditsAvailable += creditDeltas;
+   }
+
+   /**
+* Updates the value to indicate whether the reader is enqueued in the 
pipeline or not.
+*
+* @param isRegisteredAvailable True if this reader is already enqueued 
in the pipeline.
+*/
+   public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
+   this.isRegisteredAvailable = isRegisteredAvailable;
+   }
+
+   public boolean isRegisteredAvailable() {
+   return isRegisteredAvailable;
+   }
+
+   /**
+* Check whether this reader is available or not.
+*
+* Return true only if the next buffer is event or the reader has 
both available
+* credits and buffers.
+*/
+   public boolean isAvailable() {
+   if (numBuffersAvailable.get() <= 0) {
+   return false;
+   }
+
+   if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable 
> 0) {
--- End diff --

one more thing: in the special case of using it inside `getNextBuffer()`, 
we already retrieved the `remaining` number of buffers so we can spare one 
lookup into an atomic integer here by passing this one in


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324470#comment-16324470
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161282684
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -77,6 +92,46 @@ void requestSubpartitionView(
}
}
 
+   /**
+* The credits from consumer are added in incremental way.
+*
+* @param creditDeltas The credit deltas
+*/
+   public void addCredit(int creditDeltas) {
+   numCreditsAvailable += creditDeltas;
+   }
+
+   /**
+* Updates the value to indicate whether the reader is enqueued in the 
pipeline or not.
+*
+* @param isRegisteredAvailable True if this reader is already enqueued 
in the pipeline.
+*/
+   public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
+   this.isRegisteredAvailable = isRegisteredAvailable;
+   }
+
+   public boolean isRegisteredAvailable() {
+   return isRegisteredAvailable;
+   }
+
+   /**
+* Check whether this reader is available or not.
+*
+* Return true only if the next buffer is event or the reader has 
both available
+* credits and buffers.
+*/
+   public boolean isAvailable() {
+   if (numBuffersAvailable.get() <= 0) {
+   return false;
+   }
+
+   if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable 
> 0) {
--- End diff --

also, why not simply do the following for this whole method?
```
return numBuffersAvailable.get() > 0 &&
(numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
```


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324464#comment-16324464
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161241148
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -49,10 +50,24 @@
 
private volatile ResultSubpartitionView subpartitionView;
 
+   /**
+* The status indicating whether this reader is already enqueued in the 
pipeline for transferring
+* data or not. It is mainly used for avoid registering this reader to 
the pipeline repeatedly.
+*/
+   private boolean isRegisteredAvailable;
+
+   /** The number of available buffers for holding data on the consumer 
side. */
+   private int numCreditsAvailable;
--- End diff --

Just a note since I was wondering whether we need synchronization here (not 
needed after verifying the things below):

1) `numCreditsAvailable` is increased via 
`PartitionRequestServerHandler#channelRead0` which is a separate channel 
handler than `PartitionRequestQueue` (see 
`NettyProtocol#getServerChannelHandlers`). According to [Netty's thread 
model](https://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h2-34), we 
should be safe though:

> A user can specify an EventExecutor when he or she adds a handler to a 
ChannelPipeline.
> - If specified, the handler methods of the ChannelHandler are always 
invoked by the specified EventExecutor.
> - If unspecified, the handler methods are always invoked by the EventLoop 
that its associated Channel is registered to.

2) `numCreditsAvailable` is read from 
`PartitionRequestQueue#enqueueAvailableReader()` and 
`SequenceNumberingViewReader#getNextBuffer()` which are both accessed by the 
channel's IO thread only.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324468#comment-16324468
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161242036
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -49,10 +50,24 @@
 
private volatile ResultSubpartitionView subpartitionView;
 
+   /**
+* The status indicating whether this reader is already enqueued in the 
pipeline for transferring
+* data or not. It is mainly used for avoid registering this reader to 
the pipeline repeatedly.
+*/
+   private boolean isRegisteredAvailable;
--- End diff --



I know it's default-initialized to false but let's make this explicit



> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321558#comment-16321558
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849525
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -250,10 +304,12 @@ private void handleException(Channel channel, 
Throwable cause) throws IOExceptio
 
private void releaseAllResources() throws IOException {
SequenceNumberingViewReader reader;
-   while ((reader = nonEmptyReader.poll()) != null) {
+   while ((reader = availableReaders.poll()) != null) {
--- End diff --

yes, it should release all readers here.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321557#comment-16321557
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +94,37 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification from 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   @VisibleForTesting
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   @VisibleForTesting
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

agree


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320522#comment-16320522
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160695373
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -98,20 +135,35 @@ public void close() {
}
}
 
+   /**
+* Adds to maintain the unannounced credits from the consumer and it 
may trigger
+* enqueue the corresponding reader for this consumer transferring data.
+*
+* @param receiverId The input channel id to identify consumer.
+* @param credit The unannounced credits of the consumer.
+*/
+   public void addCredit(InputChannelID receiverId, int credit) throws 
Exception {
--- End diff --

could be `package-private`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320521#comment-16320521
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160694722
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +94,37 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification from 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   @VisibleForTesting
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   @VisibleForTesting
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

actually, we can inline this into the first one, make this `private`, and 
reduce `Mockito` use in the tests if we just made the `availableReaders` field 
accessible to the tests, e.g.
```
/**
 * Accesses internal state to verify reader registration in the unit 
tests.
 *
 * Do not use anywhere else!
 *
 * @return readers which are enqueued available for transferring data
 */
@VisibleForTesting
ArrayDeque getAvailableReaders() {
return availableReaders;
}
```


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320520#comment-16320520
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160714424
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -250,10 +304,12 @@ private void handleException(Channel channel, 
Throwable cause) throws IOExceptio
 
private void releaseAllResources() throws IOException {
SequenceNumberingViewReader reader;
-   while ((reader = nonEmptyReader.poll()) != null) {
+   while ((reader = availableReaders.poll()) != null) {
--- End diff --

Previously, we did not have access to all views but only those with data, 
but shouldn't we release all views instead? Especially now since there are 
views with data but with not enough credit.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318232#comment-16318232
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160371761
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

Thanks for clarification. I misunderstood this tag from the comments, I 
will modify it after all the reviews together.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318166#comment-16318166
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160361957
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

I interpret this tag actually not only for methods that are *only* used by 
testing, but for those where we had to increase visibility, e.g. from `private` 
to `package-private` as here, to be usable in tests.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317972#comment-16317972
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160333139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

The same with `triggerEnqueueAvailableReader`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317971#comment-16317971
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160333068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

This method is not used only for testing. It may be called via `addCredit` 
method.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315833#comment-16315833
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r159400371
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -77,6 +83,37 @@ void requestSubpartitionView(
}
}
 
+   public void addCredit(int credit) {
+   numCreditsAvailable += credit;
+   }
+
+   public void notifyAvailableChanged(boolean update) {
--- End diff --

`notifyAvailabilityChanged` or actually rather `setAvailability`?
also, please add a comment on what this is about / what "availability" 
means here
  


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315838#comment-16315838
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r159399376
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
--- End diff --

`from`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315831#comment-16315831
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r157760366
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -41,30 +42,33 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.ArrayDeque;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read the messages of buffer response or error 
response from the
+ * producer, to write and flush the unannounced credits for the producer.
+ */
 class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(PartitionRequestClientHandler.class);
 
-   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap();
-
-   private final AtomicReference channelError = new 
AtomicReference();
+   /** Channels, which already requested partitions from the producers. */
+   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
 
-   private final BufferListenerTask bufferListener = new 
BufferListenerTask();
+   /** Channels, which will notify the producers about unannounced credit. 
*/
+   private final ArrayDeque inputChannelsWithCredit = 
new ArrayDeque<>();
 
-   private final Queue stagedMessages = new ArrayDeque();
+   private final AtomicReference channelError = new 
AtomicReference<>();
 
-   private final StagedMessagesHandlerTask stagedMessagesHandler = new 
StagedMessagesHandlerTask();
+   private final ChannelFutureListener writeListener = new 
WriteAndFlushNextMessageIfPossibleListener();
 
/**
 * Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
 * while data is still coming in for this channel.
 */
-   private final ConcurrentMap cancelled = 
Maps.newConcurrentMap();
+   private final ConcurrentMap cancelled = 
new ConcurrentHashMap<>();
 
private volatile ChannelHandlerContext ctx;
--- End diff --

it looks like you missed to migrate some of the comments that were present 
in `CreditBasedClientHandler` but are not present here, e.g. for `ctx`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315836#comment-16315836
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r159401061
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

`@VisibleForTesting`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315834#comment-16315834
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r157760560
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -158,32 +164,44 @@ public void exceptionCaught(ChannelHandlerContext 
ctx, Throwable cause) throws E
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
try {
-   if (!bufferListener.hasStagedBufferOrEvent() && 
stagedMessages.isEmpty()) {
-   decodeMsg(msg, false);
-   }
-   else {
-   stagedMessages.add(msg);
-   }
-   }
-   catch (Throwable t) {
+   decodeMsg(msg);
+   } catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
 
+   @Override
+   public void userEventTriggered(ChannelHandlerContext ctx, Object msg) 
throws Exception {
+   if (msg instanceof RemoteInputChannel) {
+   boolean triggerWrite = 
inputChannelsWithCredit.isEmpty();
--- End diff --

please re-add the comment here, too


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315837#comment-16315837
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r159400956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

`@VisibleForTesting`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315832#comment-16315832
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r157760482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -94,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) {
}
}
 
+   void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
--- End diff --

please re-add the comment here, too


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16315835#comment-16315835
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r157760652
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -318,192 +307,56 @@ else if (bufferProvider.isDestroyed()) {
MemorySegment memSeg = 
MemorySegmentFactory.wrap(byteArray);
Buffer buffer = new Buffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false);
 
-   inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, -1);
-
-   return true;
-   }
-   }
-   finally {
-   if (releaseNettyBuffer) {
-   bufferOrEvent.releaseBuffer();
+   inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
}
+   } finally {
+   bufferOrEvent.releaseBuffer();
}
}
 
-   /**
-* This class would be replaced by CreditBasedClientHandler in the 
final,
-* so we only implement this method in CreditBasedClientHandler.
-*/
-   void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-   }
-
-   private class AsyncErrorNotificationTask implements Runnable {
-
-   private final Throwable error;
-
-   public AsyncErrorNotificationTask(Throwable error) {
-   this.error = error;
-   }
-
-   @Override
-   public void run() {
-   notifyAllChannelsOfErrorAndClose(error);
-   }
-   }
-
-   /**
-* A buffer availability listener, which subscribes/unsubscribes the NIO
-* read event.
-*
-* If no buffer is available, the channel read event will be 
unsubscribed
-* until one becomes available again.
-*
-* After a buffer becomes available again, the buffer is handed over 
by
-* the thread calling {@link #notifyBufferAvailable(Buffer)} to the 
network I/O
-* thread, which then continues the processing of the staged buffer.
-*/
-   private class BufferListenerTask implements BufferListener, Runnable {
-
-   private final AtomicReference availableBuffer = new 
AtomicReference();
-
-   private NettyMessage.BufferResponse stagedBufferResponse;
-
-   private boolean waitForBuffer(BufferProvider bufferProvider, 
NettyMessage.BufferResponse bufferResponse) {
-
-   stagedBufferResponse = bufferResponse;
-
-   if (bufferProvider.addBufferListener(this)) {
-   if (ctx.channel().config().isAutoRead()) {
-   
ctx.channel().config().setAutoRead(false);
-   }
-
-   return true;
-   }
-   else {
-   stagedBufferResponse = null;
-
-   return false;
-   }
-   }
-
-   private boolean hasStagedBufferOrEvent() {
-   return stagedBufferResponse != null;
-   }
-
-   public void notifyBufferDestroyed() {
-   // The buffer pool has been destroyed
-   stagedBufferResponse = null;
-
-   if (stagedMessages.isEmpty()) {
-   ctx.channel().config().setAutoRead(true);
-   ctx.channel().read();
-   }
-   else {
-   
ctx.channel().eventLoop().execute(stagedMessagesHandler);
-   }
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
--- End diff --

please re-add the comment here, too


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the 

[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16274235#comment-16274235
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have rebased the latest codes. Wish your reviews!


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270509#comment-16270509
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
Yes, I am planing to rebase this with the latest changes of previous 
commits.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-11-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16270469#comment-16270469
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4552
  
Ok, I think, I'll manage the review without the split.

Since this is the last of the credit-based PRs though, can you rebase on 
top of the latest changes (preferably after addressing the comments in the 
other PRs)? This way, I'll have the full picture of this crucial change.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268222#comment-16268222
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r153405989
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException 
{
BufferPool bufferPool = null;
 
try {
-   if 
(gate.getConsumedPartitionType().isCreditBased()) {
-   // Create a fixed-size buffer 
pool for floating buffers and assign exclusive buffers to input channels 
directly
-   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
-   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
-   } else {
-   int maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
-   
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
-   bufferPool = 
networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
-   
maxNumberOfMemorySegments);
-   }
+   // Create a fixed-size buffer pool for 
floating buffers and assign exclusive buffers to input channels directly
+   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
+   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --

That makes sense, I will keep the previous behaviour for batch processing.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268209#comment-16268209
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , thanks for focusing on the last PR.

I am supposed to divide this into two separate ones as you said. But I am 
afraid some current tests may fail if I only modify and enable the credit-based 
process on sender side, otherwise I may need to create a temporary handler in 
parallel with `PartitionRequestQueue`.

As you said, the latter mainly replaces the `PartitionRequestClientHandler` 
with the contents of `CreditBasedClientHandler`, also it replaces the 
`CreditBasedClientHandler` class name in previous added tests and remove 
previous temporary codes in `ResultPartitionType` to make fixed buffer pool 
work.

If it still brings difficulty for your review, then I will try to divide it 
then. : )


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266984#comment-16266984
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r152985049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException 
{
BufferPool bufferPool = null;
 
try {
-   if 
(gate.getConsumedPartitionType().isCreditBased()) {
-   // Create a fixed-size buffer 
pool for floating buffers and assign exclusive buffers to input channels 
directly
-   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
-   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
-   } else {
-   int maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
-   
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
-   bufferPool = 
networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
-   
maxNumberOfMemorySegments);
-   }
+   // Create a fixed-size buffer pool for 
floating buffers and assign exclusive buffers to input channels directly
+   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
+   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --

What about the non-bounded partition type that we use for batch processing? 
Shouldn't we use an unbounded number of floating buffers there, as previously?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-10-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203810#comment-16203810
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@pnowojski , this PR is ready for review.

It covers almost all the logics of credit-based on sender side. 
In addition, I replace the current `PartitionRequestClientHandler` with 
`CreditBasedClientHandler` and remove previous temporary codes for making this 
feature work on both sides.

It leaves a small work to do in this PR related with 
`SpilledSubpartitionView#nextBufferIsEvent` because the existing process in 
spilled sub-partition can not get next buffer directly. But the current default 
value for  `nextBufferIsEvent`` will not affect the core process, only results 
in wasting a unnecessary credit, then I will try to solve it in a lightweight 
way later. 


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2017-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16129035#comment-16129035
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/4552

[FLINK-7456][network]Implement Netty sender incoming pipeline for 
credit-based

## What is the purpose of the change

This PR is based on #4533 whose commits are also included for passing 
travis. Review the last commit for this PR change.

On sender side, it maintains credit from receiver's `PartitionRequest` and 
`AddCredit` messages, then sends buffer based on credit and network capacity. 
This PR is mainly involved in incoming pipeline logic for credit-based.

## Brief change log

  - *Each subpartition view maintains current credit and a boolean field to 
mark whether it is already registered available for transfer*
  - *Update current credit in processing `PartitionRequest` and `AddCredit` 
messages*
  - *The mechanism of enqueue the subpartition view and update the 
registered status field*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added test to verify that current credit is updated correctly and 
subpartition view is enqueued when received `AddCredit` message*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-7456

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4552


commit e35c1ff8066bf44344495d132a1092b9db3ef182
Author: Zhijiang 
Date:   2017-08-07T09:31:17Z

[FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for 
the floating buffers

commit 969c24d3bf80c1ff89ada11e81b9bf4fea14066f
Author: Zhijiang 
Date:   2017-08-14T06:30:47Z

[FLINK-7394][core]Implement basic InputChannel with free buffers,credit and 
backlog

commit 15fa828449d73f53042c57e9c5494d75ddee575f
Author: Zhijiang 
Date:   2017-08-10T05:29:13Z

[FLINK-7406][network]Implement Netty receiver incoming pipeline for 
credit-based

commit d0674244f15701863a5dd3f68b7274b3bd49c64d
Author: Zhijiang 
Date:   2017-08-12T14:13:25Z

[FLINK-7416][network] Implement Netty receiver outgoing pipeline for 
credit-based

commit 6eaff7877ad43eab674e184153365b50ec8e1559
Author: Zhijiang 
Date:   2017-08-16T13:24:53Z

[FLINK-7456][network]Implement Netty sender incoming pipeline for 
credit-based




> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)