[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-02-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4552


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162323707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -31,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
--- End diff --

no, you don't need to - I can create a separate PR for FLINK-8225


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162266243
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -31,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
--- End diff --

Should I pick the FLINK-8225 in this PR?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ---
@@ -164,11 +165,13 @@ private boolean dispose() {
private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
-   PartitionRequestClientHandler 
requestHandler = channel.pipeline()
-   
.get(PartitionRequestClientHandler.class);
+   NetworkClientHandler clientHandler = 
channel.pipeline().get(PartitionRequestClientHandler.class);
+   if (clientHandler == null) {
+   clientHandler = 
channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class);
+   }
--- End diff --

good idea!


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259728
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 ---
@@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, 
NettyMessage msg) throws
LOG.debug("Read channel on {}: {}.", 
ctx.channel().localAddress(), request);
 
try {
-   SequenceNumberingViewReader reader = 
new SequenceNumberingViewReader(
-   request.receiverId,
-   request.credit,
-   outboundQueue);
+   NetworkSequenceViewReader reader;
+   if (request.credit > 0) {
+   reader = new 
CreditBasedSequenceNumberingViewReader(
+   request.receiverId,
+   request.credit,
+   outboundQueue);
+   } else {
+   reader = new 
SequenceNumberingViewReader(
+   request.receiverId,
+   outboundQueue);
+   }
--- End diff --

Yes, I actually took a hacky way to realize it for easy. :)
I will consider whether it is feasible to get the config here. If so I will 
take the config way, otherwise I may add a comment clarifying this.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162259766
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -125,11 +126,11 @@ private void enqueueAvailableReader(final 
SequenceNumberingViewReader reader) th
 * @return readers which are enqueued available for transferring data
 */
@VisibleForTesting
-   ArrayDeque getAvailableReaders() {
+   ArrayDeque getAvailableReaders() {
return availableReaders;
}
 
-   void notifyReaderCreated(final SequenceNumberingViewReader reader) {
+   public void notifyReaderCreated(final NetworkSequenceViewReader reader) 
{
--- End diff --

yes, for my careless


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162107348
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -31,7 +31,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
--- End diff --

also found that relic of FLINK-8225 (removed in my patch for the previous 
commit, and it should be inside FLINK-7456, not the hotfix which we will revert 
later)


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162103769
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ---
@@ -164,11 +165,13 @@ private boolean dispose() {
private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
-   PartitionRequestClientHandler 
requestHandler = channel.pipeline()
-   
.get(PartitionRequestClientHandler.class);
+   NetworkClientHandler clientHandler = 
channel.pipeline().get(PartitionRequestClientHandler.class);
+   if (clientHandler == null) {
+   clientHandler = 
channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class);
+   }
--- End diff --

if you let `NetworkClientHandler` extend from `ChannelHandler`, then this 
can be simplified to
```
NetworkClientHandler clientHandler = 
channel.pipeline().get(NetworkClientHandler.class);
```


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162104968
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -125,11 +126,11 @@ private void enqueueAvailableReader(final 
SequenceNumberingViewReader reader) th
 * @return readers which are enqueued available for transferring data
 */
@VisibleForTesting
-   ArrayDeque getAvailableReaders() {
+   ArrayDeque getAvailableReaders() {
return availableReaders;
}
 
-   void notifyReaderCreated(final SequenceNumberingViewReader reader) {
+   public void notifyReaderCreated(final NetworkSequenceViewReader reader) 
{
--- End diff --

this could stay `package-private`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162106371
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
 ---
@@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, 
NettyMessage msg) throws
LOG.debug("Read channel on {}: {}.", 
ctx.channel().localAddress(), request);
 
try {
-   SequenceNumberingViewReader reader = 
new SequenceNumberingViewReader(
-   request.receiverId,
-   request.credit,
-   outboundQueue);
+   NetworkSequenceViewReader reader;
+   if (request.credit > 0) {
+   reader = new 
CreditBasedSequenceNumberingViewReader(
+   request.receiverId,
+   request.credit,
+   outboundQueue);
+   } else {
+   reader = new 
SequenceNumberingViewReader(
+   request.receiverId,
+   outboundQueue);
+   }
--- End diff --

This seems a bit hacky since it does not rely on the configuration 
parameter directly but rather on the affect it has, i.e. that a 
`PartitionRequest` in credit-based flow control always has a non-zero credit 
(due to `NetworkBufferPool#requestMemorySegments()`).
Relying on the configuration parameter would be nicer, but alternatively, 
you could add a comment clarifying this.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r162047076
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

Do you, by any chance, know why 800 worked and is not enough here anymore? 
I mean, why/how does this test need 800 buffers?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667346
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

yes, the same reason as above


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667234
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

yes, i will set 9 for it.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161568012
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
+   assertFalse(view.nextBufferIsEvent());
--- End diff --

we should test this everywhere we access `getNextBuffer()` or add buffers 
via `add()` - also if `getNextBuffer()` is `null` or before even requesting 
anything


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161576253
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

just a note for the curious:  this test can cope with higher number of 
network buffers and is waiting for all of them to be blocked - increasing this 
to `9` would have been enough here though (we require 2 exclusive buffers now 
per default, while 1 was the minimum per incoming channel)


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161565565
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
--- End diff --

actually, let's use `assertThat(queue.getAvailableReaders(), 
contains(reader));` here which gives much nicer output in case something is 
wrong


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161570121
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
+   assertFalse(view.nextBufferIsEvent());
--- End diff --

also test `read.buffer().isBuffer()` then?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161567305
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
--- End diff --

let's end with `assertNull(channel.readOutbound());`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161546145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() 
throws Exception {
 
assertEquals(2, inputChannel.getUnannouncedCredit());
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel);
-
// Release the input channel
inputGate.releaseAllResources();
 
channel.runPendingTasks();
 
-   // It will not notify credits for released input channel
+   // It should send partition request first, and send 
close request after releasing input channel,
+   // but will not notify credits for released input 
channel.
+   Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
--- End diff --

similar here: verify `PartitionRequest` after 
`inputChannel.requestSubpartition`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559926
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = spy(new 
PartitionRequestQueue());
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
+   // The reader is enqueued in the pipeline because the next 
buffer is event, even though no available credits
+   verify(queue, times(1)).enqueueAvailableReader(reader);
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --

let's remove that mock ...


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161578518
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

Is that also the reason here? I see that otherwise we get into 
`Insufficient number of network buffers` but it does not look as if it was 
configured as tightly...
(just want to rule out some memory leak with the new code)


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161573976
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
+   assertFalse(reader.nextBufferIsEvent());
--- End diff --

also test `read.buffer().isBuffer()`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161569135
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
 
+   assertFalse(view.nextBufferIsEvent());
+   read = view.getNextBuffer();
+   assertNotNull(read);
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
+   assertNull(view.getNextBuffer());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+
// Add event to the queue...
Buffer event = createBuffer();
event.tagAsEvent();
subpartition.add(event);
 
+   assertTrue(view.nextBufferIsEvent());
assertEquals(3, subpartition.getTotalNumberOfBuffers());
-   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(0, subpartition.getBuffersInBacklog());
assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(3)).notifyBuffersAvailable(eq(1L));
}
--- End diff --

maybe verify that `nextBufferIsEvent()` returns the right thing after 
adding a real buffer now (with the event being next)


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161567331
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+   queue.notifyReaderCreated(reader);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify available buffers to trigger enqueue the reader
+   final int notifyNumBuffers = 5;
+   for (int i = 0; i < notifyNumBuffers; i++) {
+   reader.notifyBuffersAvailable(1);
+   }
+
+   

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559642
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
--- End diff --

let's remove that mock ...


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161547041
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
--- End diff --

nit: `is an event`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559690
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --

let's remove that mock ...


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559913
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = spy(new 
PartitionRequestQueue());
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
+   // The reader is enqueued in the pipeline because the next 
buffer is event, even though no available credits
+   verify(queue, times(1)).enqueueAvailableReader(reader);
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
--- End diff --

let's remove that mock ...


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161565896
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+   queue.notifyReaderCreated(reader);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify available buffers to trigger enqueue the reader
+   final int notifyNumBuffers = 5;
+   for (int i = 0; i < notifyNumBuffers; i++) {
+   reader.notifyBuffersAvailable(1);
+   }
+
+   

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161546199
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() 
throws Exception {
 
assertEquals(2, inputChannel.getUnannouncedCredit());
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel);
-
// Release the input channel
inputGate.releaseAllResources();
 
channel.runPendingTasks();
 
-   // It will not notify credits for released input channel
+   // It should send partition request first, and send 
close request after releasing input channel,
+   // but will not notify credits for released input 
channel.
+   Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
+   readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(CloseRequest.class));
--- End diff --

put these after `inputGate.releaseAllResources()`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161485403
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -434,6 +443,29 @@ private RemoteInputChannel 
createRemoteInputChannel(SingleInputGate inputGate) t

UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
}
 
+   /**
+* Creates and returns a remote input channel for the specific input 
gate with specific partition request client.
+*
+* @param inputGate The input gate owns the created input channel.
+* @param client The client is used to send partition request.
+* @return The new created remote input channel.
+*/
+   private RemoteInputChannel createRemoteInputChannel(SingleInputGate 
inputGate, PartitionRequestClient client) throws Exception {
--- End diff --

could you modify 
`PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` 
to rely on this method, i.e. `return createRemoteInputChannel(inputGate, 
mock(PartitionRequestClient.class));`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161545040
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws 
Exception {
handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse1);
handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse2);
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel1);
-   handler.notifyCreditAvailable(inputChannel2);
-
assertEquals(2, inputChannel1.getUnannouncedCredit());
assertEquals(2, inputChannel2.getUnannouncedCredit());
 
channel.runPendingTasks();
 
-   // The two input channels should notify credits via 
writable channel
+   // The two input channels should send partition 
requests and then notify credits via writable channel
assertTrue(channel.isWritable());
Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(inputChannel1.getInputChannelId(), 
((PartitionRequest) readFromOutbound).receiverId);
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
+
+   readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(inputChannel2.getInputChannelId(), 
((PartitionRequest) readFromOutbound).receiverId);
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
--- End diff --

Let's verify those two `PartitionRequest` messages above since 
`inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if 
we change the `initialCredit` to be included in the `unannouncedCredit`).


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161450876
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -199,6 +199,19 @@ public boolean isReleased() {
}
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
--- End diff --

agree with integration


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161248402
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -52,4 +52,9 @@
boolean isReleased();
 
Throwable getFailureCause();
+
+   /**
+* Returns whether the next buffer is event or not.
--- End diff --

`is an event`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161247589
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -77,6 +92,46 @@ void requestSubpartitionView(
}
}
 
+   /**
+* The credits from consumer are added in incremental way.
+*
+* @param creditDeltas The credit deltas
+*/
+   public void addCredit(int creditDeltas) {
+   numCreditsAvailable += creditDeltas;
+   }
+
+   /**
+* Updates the value to indicate whether the reader is enqueued in the 
pipeline or not.
+*
+* @param isRegisteredAvailable True if this reader is already enqueued 
in the pipeline.
+*/
+   public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
+   this.isRegisteredAvailable = isRegisteredAvailable;
+   }
+
+   public boolean isRegisteredAvailable() {
+   return isRegisteredAvailable;
+   }
+
+   /**
+* Check whether this reader is available or not.
+*
+* Return true only if the next buffer is event or the reader has 
both available
+* credits and buffers.
+*/
+   public boolean isAvailable() {
+   if (numBuffersAvailable.get() <= 0) {
+   return false;
+   }
+
+   if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable 
> 0) {
--- End diff --

how about checking `numCreditsAvailable` first since that's the cheaper 
check?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161241148
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -49,10 +50,24 @@
 
private volatile ResultSubpartitionView subpartitionView;
 
+   /**
+* The status indicating whether this reader is already enqueued in the 
pipeline for transferring
+* data or not. It is mainly used for avoid registering this reader to 
the pipeline repeatedly.
+*/
+   private boolean isRegisteredAvailable;
+
+   /** The number of available buffers for holding data on the consumer 
side. */
+   private int numCreditsAvailable;
--- End diff --

Just a note since I was wondering whether we need synchronization here (not 
needed after verifying the things below):

1) `numCreditsAvailable` is increased via 
`PartitionRequestServerHandler#channelRead0` which is a separate channel 
handler than `PartitionRequestQueue` (see 
`NettyProtocol#getServerChannelHandlers`). According to [Netty's thread 
model](https://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h2-34), we 
should be safe though:

> A user can specify an EventExecutor when he or she adds a handler to a 
ChannelPipeline.
> - If specified, the handler methods of the ChannelHandler are always 
invoked by the specified EventExecutor.
> - If unspecified, the handler methods are always invoked by the EventLoop 
that its associated Channel is registered to.

2) `numCreditsAvailable` is read from 
`PartitionRequestQueue#enqueueAvailableReader()` and 
`SequenceNumberingViewReader#getNextBuffer()` which are both accessed by the 
channel's IO thread only.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161296711
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -73,6 +73,9 @@
/** Flag indicating whether all resources have been released. */
private AtomicBoolean isReleased = new AtomicBoolean();
 
+   /** The next buffer to hand out. */
+   private Buffer nextBuffer;
--- End diff --

We need to protect this against race conditions with respect to 
`releaseAllResources()` as well.

Actually, I'm surprised that nothing in this class is protected against 
concurrently releasing it. Although I have created a separate issue for this 
([FLINK-8425](https://issues.apache.org/jira/browse/FLINK-8425)), I think we 
may need to solve this here for the new `nextBuffer` anyway.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161282684
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -77,6 +92,46 @@ void requestSubpartitionView(
}
}
 
+   /**
+* The credits from consumer are added in incremental way.
+*
+* @param creditDeltas The credit deltas
+*/
+   public void addCredit(int creditDeltas) {
+   numCreditsAvailable += creditDeltas;
+   }
+
+   /**
+* Updates the value to indicate whether the reader is enqueued in the 
pipeline or not.
+*
+* @param isRegisteredAvailable True if this reader is already enqueued 
in the pipeline.
+*/
+   public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
+   this.isRegisteredAvailable = isRegisteredAvailable;
+   }
+
+   public boolean isRegisteredAvailable() {
+   return isRegisteredAvailable;
+   }
+
+   /**
+* Check whether this reader is available or not.
+*
+* Return true only if the next buffer is event or the reader has 
both available
+* credits and buffers.
+*/
+   public boolean isAvailable() {
+   if (numBuffersAvailable.get() <= 0) {
+   return false;
+   }
+
+   if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable 
> 0) {
--- End diff --

also, why not simply do the following for this whole method?
```
return numBuffersAvailable.get() > 0 &&
(numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
```


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161249082
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -199,6 +199,19 @@ public boolean isReleased() {
}
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
--- End diff --

We probably need synchronization here to access `nextBuffer` and also check 
for `isReleased()` similar to `getNextBuffer`, since we are basically doing the 
same but without taking the buffer.

Why not integrate this into `getNextBuffer` and the `BufferAndBacklog` 
returned there? Inside that method, gathering this additional info is basically 
for free and we may thus speed up some code paths.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161295428
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -163,6 +190,15 @@ public boolean isReleased() {
return parent.isReleased() || isReleased.get();
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
+
+   return false;
--- End diff --

I'm afraid relying on `nextBuffer` won't be enough because it might not 
have been there during `getNextBuffer()` but may be there now.

Please add a test for this case.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161242036
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -49,10 +50,24 @@
 
private volatile ResultSubpartitionView subpartitionView;
 
+   /**
+* The status indicating whether this reader is already enqueued in the 
pipeline for transferring
+* data or not. It is mainly used for avoid registering this reader to 
the pipeline repeatedly.
+*/
+   private boolean isRegisteredAvailable;
--- End diff --



I know it's default-initialized to false but let's make this explicit



---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161295951
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -199,6 +199,19 @@ public boolean isReleased() {
}
}
 
+   @Override
+   public boolean nextBufferIsEvent() {
+   if (nextBuffer != null) {
+   return !nextBuffer.isBuffer();
+   }
+
+   if (spilledView != null) {
--- End diff --

`checkState(spilledView != null, "No in-memory buffers available, but also 
nothing spilled.");` just like in `getNextBuffer()`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-12 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161287607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -77,6 +92,46 @@ void requestSubpartitionView(
}
}
 
+   /**
+* The credits from consumer are added in incremental way.
+*
+* @param creditDeltas The credit deltas
+*/
+   public void addCredit(int creditDeltas) {
+   numCreditsAvailable += creditDeltas;
+   }
+
+   /**
+* Updates the value to indicate whether the reader is enqueued in the 
pipeline or not.
+*
+* @param isRegisteredAvailable True if this reader is already enqueued 
in the pipeline.
+*/
+   public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
+   this.isRegisteredAvailable = isRegisteredAvailable;
+   }
+
+   public boolean isRegisteredAvailable() {
+   return isRegisteredAvailable;
+   }
+
+   /**
+* Check whether this reader is available or not.
+*
+* Return true only if the next buffer is event or the reader has 
both available
+* credits and buffers.
+*/
+   public boolean isAvailable() {
+   if (numBuffersAvailable.get() <= 0) {
+   return false;
+   }
+
+   if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable 
> 0) {
--- End diff --

one more thing: in the special case of using it inside `getNextBuffer()`, 
we already retrieved the `remaining` number of buffers so we can spare one 
lookup into an atomic integer here by passing this one in


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849525
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -250,10 +304,12 @@ private void handleException(Channel channel, 
Throwable cause) throws IOExceptio
 
private void releaseAllResources() throws IOException {
SequenceNumberingViewReader reader;
-   while ((reader = nonEmptyReader.poll()) != null) {
+   while ((reader = availableReaders.poll()) != null) {
--- End diff --

yes, it should release all readers here.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160849478
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +94,37 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification from 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   @VisibleForTesting
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   @VisibleForTesting
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

agree


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160695373
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -98,20 +135,35 @@ public void close() {
}
}
 
+   /**
+* Adds to maintain the unannounced credits from the consumer and it 
may trigger
+* enqueue the corresponding reader for this consumer transferring data.
+*
+* @param receiverId The input channel id to identify consumer.
+* @param credit The unannounced credits of the consumer.
+*/
+   public void addCredit(InputChannelID receiverId, int credit) throws 
Exception {
--- End diff --

could be `package-private`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160694722
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +94,37 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification from 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   @VisibleForTesting
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   @VisibleForTesting
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

actually, we can inline this into the first one, make this `private`, and 
reduce `Mockito` use in the tests if we just made the `availableReaders` field 
accessible to the tests, e.g.
```
/**
 * Accesses internal state to verify reader registration in the unit 
tests.
 *
 * Do not use anywhere else!
 *
 * @return readers which are enqueued available for transferring data
 */
@VisibleForTesting
ArrayDeque getAvailableReaders() {
return availableReaders;
}
```


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-10 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160714424
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -250,10 +304,12 @@ private void handleException(Channel channel, 
Throwable cause) throws IOExceptio
 
private void releaseAllResources() throws IOException {
SequenceNumberingViewReader reader;
-   while ((reader = nonEmptyReader.poll()) != null) {
+   while ((reader = availableReaders.poll()) != null) {
--- End diff --

Previously, we did not have access to all views but only those with data, 
but shouldn't we release all views instead? Especially now since there are 
views with data but with not enough credit.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-09 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160371761
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

Thanks for clarification. I misunderstood this tag from the comments, I 
will modify it after all the reviews together.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-09 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160361957
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

I interpret this tag actually not only for methods that are *only* used by 
testing, but for those where we had to increase visibility, e.g. from `private` 
to `package-private` as here, to be usable in tests.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-09 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160333139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

The same with `triggerEnqueueAvailableReader`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-09 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r160333068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

This method is not used only for testing. It may be called via `addCredit` 
method.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-08 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r159399376
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
--- End diff --

`from`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-08 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r159400956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
+   if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
+   enqueueAvailableReader(reader);
+   }
+   }
+
+   void enqueueAvailableReader(final SequenceNumberingViewReader reader) 
throws Exception {
--- End diff --

`@VisibleForTesting`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-08 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r159400371
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 ---
@@ -77,6 +83,37 @@ void requestSubpartitionView(
}
}
 
+   public void addCredit(int credit) {
+   numCreditsAvailable += credit;
+   }
+
+   public void notifyAvailableChanged(boolean update) {
--- End diff --

`notifyAvailabilityChanged` or actually rather `setAvailability`?
also, please add a comment on what this is about / what "availability" 
means here
  


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-08 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r159401061
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
@@ -88,6 +93,35 @@ public void run() {
});
}
 
+   /**
+* Try to enqueue the reader once receiving credit notification form 
the consumer or receiving
+* non-empty reader notification from the producer. Only one thread 
would trigger the actual
+* enqueue after checking the reader's availability, so there is no 
race condition here.
+*/
+   void triggerEnqueueAvailableReader(final SequenceNumberingViewReader 
reader) throws Exception {
--- End diff --

`@VisibleForTesting`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-08 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r157760560
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -158,32 +164,44 @@ public void exceptionCaught(ChannelHandlerContext 
ctx, Throwable cause) throws E
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
try {
-   if (!bufferListener.hasStagedBufferOrEvent() && 
stagedMessages.isEmpty()) {
-   decodeMsg(msg, false);
-   }
-   else {
-   stagedMessages.add(msg);
-   }
-   }
-   catch (Throwable t) {
+   decodeMsg(msg);
+   } catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
 
+   @Override
+   public void userEventTriggered(ChannelHandlerContext ctx, Object msg) 
throws Exception {
+   if (msg instanceof RemoteInputChannel) {
+   boolean triggerWrite = 
inputChannelsWithCredit.isEmpty();
--- End diff --

please re-add the comment here, too


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-08 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r157760652
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -318,192 +307,56 @@ else if (bufferProvider.isDestroyed()) {
MemorySegment memSeg = 
MemorySegmentFactory.wrap(byteArray);
Buffer buffer = new Buffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false);
 
-   inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, -1);
-
-   return true;
-   }
-   }
-   finally {
-   if (releaseNettyBuffer) {
-   bufferOrEvent.releaseBuffer();
+   inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
}
+   } finally {
+   bufferOrEvent.releaseBuffer();
}
}
 
-   /**
-* This class would be replaced by CreditBasedClientHandler in the 
final,
-* so we only implement this method in CreditBasedClientHandler.
-*/
-   void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-   }
-
-   private class AsyncErrorNotificationTask implements Runnable {
-
-   private final Throwable error;
-
-   public AsyncErrorNotificationTask(Throwable error) {
-   this.error = error;
-   }
-
-   @Override
-   public void run() {
-   notifyAllChannelsOfErrorAndClose(error);
-   }
-   }
-
-   /**
-* A buffer availability listener, which subscribes/unsubscribes the NIO
-* read event.
-*
-* If no buffer is available, the channel read event will be 
unsubscribed
-* until one becomes available again.
-*
-* After a buffer becomes available again, the buffer is handed over 
by
-* the thread calling {@link #notifyBufferAvailable(Buffer)} to the 
network I/O
-* thread, which then continues the processing of the staged buffer.
-*/
-   private class BufferListenerTask implements BufferListener, Runnable {
-
-   private final AtomicReference availableBuffer = new 
AtomicReference();
-
-   private NettyMessage.BufferResponse stagedBufferResponse;
-
-   private boolean waitForBuffer(BufferProvider bufferProvider, 
NettyMessage.BufferResponse bufferResponse) {
-
-   stagedBufferResponse = bufferResponse;
-
-   if (bufferProvider.addBufferListener(this)) {
-   if (ctx.channel().config().isAutoRead()) {
-   
ctx.channel().config().setAutoRead(false);
-   }
-
-   return true;
-   }
-   else {
-   stagedBufferResponse = null;
-
-   return false;
-   }
-   }
-
-   private boolean hasStagedBufferOrEvent() {
-   return stagedBufferResponse != null;
-   }
-
-   public void notifyBufferDestroyed() {
-   // The buffer pool has been destroyed
-   stagedBufferResponse = null;
-
-   if (stagedMessages.isEmpty()) {
-   ctx.channel().config().setAutoRead(true);
-   ctx.channel().read();
-   }
-   else {
-   
ctx.channel().eventLoop().execute(stagedMessagesHandler);
-   }
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
--- End diff --

please re-add the comment here, too


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-08 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r157760482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -94,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) {
}
}
 
+   void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
--- End diff --

please re-add the comment here, too


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-08 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r157760366
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -41,30 +42,33 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.ArrayDeque;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read the messages of buffer response or error 
response from the
+ * producer, to write and flush the unannounced credits for the producer.
+ */
 class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(PartitionRequestClientHandler.class);
 
-   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap();
-
-   private final AtomicReference channelError = new 
AtomicReference();
+   /** Channels, which already requested partitions from the producers. */
+   private final ConcurrentMap 
inputChannels = new ConcurrentHashMap<>();
 
-   private final BufferListenerTask bufferListener = new 
BufferListenerTask();
+   /** Channels, which will notify the producers about unannounced credit. 
*/
+   private final ArrayDeque inputChannelsWithCredit = 
new ArrayDeque<>();
 
-   private final Queue stagedMessages = new ArrayDeque();
+   private final AtomicReference channelError = new 
AtomicReference<>();
 
-   private final StagedMessagesHandlerTask stagedMessagesHandler = new 
StagedMessagesHandlerTask();
+   private final ChannelFutureListener writeListener = new 
WriteAndFlushNextMessageIfPossibleListener();
 
/**
 * Set of cancelled partition requests. A request is cancelled iff an 
input channel is cleared
 * while data is still coming in for this channel.
 */
-   private final ConcurrentMap cancelled = 
Maps.newConcurrentMap();
+   private final ConcurrentMap cancelled = 
new ConcurrentHashMap<>();
 
private volatile ChannelHandlerContext ctx;
--- End diff --

it looks like you missed to migrate some of the comments that were present 
in `CreditBasedClientHandler` but are not present here, e.g. for `ctx`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2017-11-27 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r153405989
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException 
{
BufferPool bufferPool = null;
 
try {
-   if 
(gate.getConsumedPartitionType().isCreditBased()) {
-   // Create a fixed-size buffer 
pool for floating buffers and assign exclusive buffers to input channels 
directly
-   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
-   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
-   } else {
-   int maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
-   
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
-   bufferPool = 
networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
-   
maxNumberOfMemorySegments);
-   }
+   // Create a fixed-size buffer pool for 
floating buffers and assign exclusive buffers to input channels directly
+   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
+   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --

That makes sense, I will keep the previous behaviour for batch processing.


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2017-11-27 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r152985049
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException 
{
BufferPool bufferPool = null;
 
try {
-   if 
(gate.getConsumedPartitionType().isCreditBased()) {
-   // Create a fixed-size buffer 
pool for floating buffers and assign exclusive buffers to input channels 
directly
-   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
-   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
-   } else {
-   int maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
-   
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
-   bufferPool = 
networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
-   
maxNumberOfMemorySegments);
-   }
+   // Create a fixed-size buffer pool for 
floating buffers and assign exclusive buffers to input channels directly
+   bufferPool = 
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, 
extraNetworkBuffersPerGate);
+   
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --

What about the non-bounded partition type that we use for batch processing? 
Shouldn't we use an unbounded number of floating buffers there, as previously?


---