[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620434#comment-16620434
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK closed pull request #6692: [FLINK-10331][network] reduce unnecesary 
flushing
URL: https://github.com/apache/flink/pull/6692
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 41ee03db259..8630acee9a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -485,7 +485,7 @@ private void initializeWithPartialRecord(NonSpanningWrapper 
partial, int nextRec
}
else {
// collect in memory
-   ensureBufferCapacity(numBytesChunk);
+   ensureBufferCapacity(nextRecordLength);
partial.segment.get(partial.position, buffer, 
0, numBytesChunk);
}
 
@@ -502,8 +502,8 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
int segmentRemaining = numBytes;
// check where to go. if we have a partial length, we 
need to complete it first
if (this.lengthBuffer.position() > 0) {
-   int toPut = 
Math.min(this.lengthBuffer.remaining(), numBytes);
-   segment.get(offset, this.lengthBuffer, toPut);
+   int toPut = 
Math.min(this.lengthBuffer.remaining(), segmentRemaining);
+   segment.get(segmentPosition, this.lengthBuffer, 
toPut);
// did we complete the length?
if (this.lengthBuffer.hasRemaining()) {
return;
@@ -515,6 +515,8 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
segmentRemaining -= toPut;
if (this.recordLength > 
THRESHOLD_FOR_SPILLING) {
this.spillingChannel = 
createSpillingChannel();
+   } else {
+   
ensureBufferCapacity(this.recordLength);
}
}
}
@@ -527,9 +529,7 @@ private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, in
// spill to file
ByteBuffer toWrite = 
segment.wrap(segmentPosition, toCopy);
this.spillingChannel.write(toWrite);
-   }
-   else {
-   ensureBufferCapacity(accumulatedRecordBytes + 
toCopy);
+   } else {
segment.get(segmentPosition, buffer, 
this.accumulatedRecordBytes, toCopy);
}
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 305f1842911..6fb067ef8c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -104,9 +104,9 @@ public void commit() {
 * @return number of written bytes.
 */
public int finish() {
-   positionMarker.markFinished();
+   int writtenBytes = positionMarker.markFinished();
commit();
-   return getWrittenBytes();
+   return writtenBytes;
}
 
public boolean isFinished() {
@@ -118,18 +118,10 @@ public boolean isFull() {
return positionMarker.getCached() == getMaxCapacity();
}
 
-   public boolean isEmpty() {
-   return positionMarker.getCached() == 0;
-   }
-
public int getMaxCapacity() {
return memorySegment.size();
}
 
-   private 

[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618854#comment-16618854
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK commented on issue #6692: [FLINK-10331][network] reduce unnecesary 
flushing
URL: https://github.com/apache/flink/pull/6692#issuecomment-422337715
 
 
   thanks for the review - I integrated the changes
   (FYI: I already merged the additional comment to `PipelinedSubpartition` 
into the respective commit because that change is so small and no further 
squashing is now required)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618851#comment-16618851
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218375527
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ##
 @@ -179,10 +185,49 @@ public void 
testFlushWithUnfinishedBufferBehindFinished() throws Exception {
try {
subpartition.add(createFilledBufferConsumer(1025)); // 
finished

subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+   long oldNumNotifications = 
availablityListener.getNumNotifications();
subpartition.flush();
+   // buffer queue is > 1, should already be notified, no 
further notification necessary
+   assertThat(oldNumNotifications, greaterThan(0L));
+   assertEquals(oldNumNotifications, 
availablityListener.getNumNotifications());
 
assertNextBuffer(readView, 1025, true, 1, false, true);
assertNextBuffer(readView, 1024, false, 1, false, 
false);
+   assertNoNextBuffer(readView);
+   } finally {
+   subpartition.release();
+   }
+   }
+
+   /**
+* A flush call with a buffer size of 1 should always notify consumers 
(unless already flushed).
+*/
+   @Test
+   public void testFlushWithUnfinishedBufferBehindFinished2() throws 
Exception {
+   final ResultSubpartition subpartition = createSubpartition();
 
 Review comment:
   ok, I forked off the methods with read view and availability listener to 
`PipelinedSubpartitionWithReadViewTest`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618775#comment-16618775
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218363450
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ##
 @@ -179,10 +185,49 @@ public void 
testFlushWithUnfinishedBufferBehindFinished() throws Exception {
try {
subpartition.add(createFilledBufferConsumer(1025)); // 
finished

subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+   long oldNumNotifications = 
availablityListener.getNumNotifications();
subpartition.flush();
+   // buffer queue is > 1, should already be notified, no 
further notification necessary
+   assertThat(oldNumNotifications, greaterThan(0L));
+   assertEquals(oldNumNotifications, 
availablityListener.getNumNotifications());
 
assertNextBuffer(readView, 1025, true, 1, false, true);
assertNextBuffer(readView, 1024, false, 1, false, 
false);
+   assertNoNextBuffer(readView);
+   } finally {
+   subpartition.release();
+   }
+   }
+
+   /**
+* A flush call with a buffer size of 1 should always notify consumers 
(unless already flushed).
+*/
+   @Test
+   public void testFlushWithUnfinishedBufferBehindFinished2() throws 
Exception {
+   final ResultSubpartition subpartition = createSubpartition();
 
 Review comment:
   unfortunately, not every unit test works on the same setup - are you 
proposing to
   - instantiate these nonetheless and let those be unused in some tests, or
   - split the unit test into one with and one without this initialization?
   Or maybe I'm not aware of some trick that solves this...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618757#comment-16618757
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218357947
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
return Math.max(buffers.size(), 0);
}
 
+   @Override
+   public void flush() {
+   synchronized (buffers) {
+   if (buffers.isEmpty()) {
+   return;
+   }
+   if (!flushRequested) {
+   flushRequested = true; // set this before the 
notification!
+   if (buffers.size() == 1) {
 
 Review comment:
   sure - I think, I explained the flushing behaviour in the class' JavaDoc
   > Whenever {@link #add(BufferConsumer)} adds a finished {@link 
BufferConsumer} or a second {@link BufferConsumer} (in which case we will 
assume the first one finished), we will {@link 
PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via 
{@link #createReadView(BufferAvailabilityListener)} of new data availability. 
Except by calling {@link #flush()} explicitly, we always only notify when the 
first finished buffer turns up and then, the reader has to drain the buffers 
via {@link #pollBuffer()} until its return value shows no more buffers being 
available.
   
   But it doesn't hurt to have something small / more explicit here as well
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618755#comment-16618755
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218357947
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
return Math.max(buffers.size(), 0);
}
 
+   @Override
+   public void flush() {
+   synchronized (buffers) {
+   if (buffers.isEmpty()) {
+   return;
+   }
+   if (!flushRequested) {
+   flushRequested = true; // set this before the 
notification!
+   if (buffers.size() == 1) {
 
 Review comment:
   sure - I think, I explained the flushing behaviour in the class' JavaDoc but 
it doesn't hurt to have something small here as well


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618756#comment-16618756
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218357947
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
return Math.max(buffers.size(), 0);
}
 
+   @Override
+   public void flush() {
+   synchronized (buffers) {
+   if (buffers.isEmpty()) {
+   return;
+   }
+   if (!flushRequested) {
+   flushRequested = true; // set this before the 
notification!
+   if (buffers.size() == 1) {
 
 Review comment:
   sure - I think, I explained the flushing behaviour in the class' JavaDoc
   > Except by calling {@link #flush()} explicitly, we always only notify when 
the first finished buffer turns up and then, the reader has to drain the 
buffers via {@link #pollBuffer()} until its return value shows no more buffers 
being available.
   
   But it doesn't hurt to have something small here as well
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618754#comment-16618754
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218357947
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
return Math.max(buffers.size(), 0);
}
 
+   @Override
+   public void flush() {
+   synchronized (buffers) {
+   if (buffers.isEmpty()) {
+   return;
+   }
+   if (!flushRequested) {
+   flushRequested = true; // set this before the 
notification!
+   if (buffers.size() == 1) {
 
 Review comment:
   sure - I think, I explained the flushing behaviour in #6693 in the class' 
comment but it doesn't hurt to have something small here as well


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618751#comment-16618751
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218357314
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 ##
 @@ -198,6 +199,8 @@ public void testConsumptionWithMixedChannels() throws 
Exception {
private abstract static class Source {
 
abstract void addBufferConsumer(BufferConsumer bufferConsumer) 
throws Exception;
+
+   abstract void flush();
 
 Review comment:
   depending on the implementation in `PipelinedSubpartition`, i.e. `if 
(buffers.size() == 1 && buffers.peekLast().isFinished())` or whatever we change 
it to (we don't make guarantees here!), the producer thread may not have 
flushed its last record after finishing and the source would wait forever (no 
output flusher in that test)
   -> we need to flush all channels before leaving the producer


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618605#comment-16618605
 ] 

ASF GitHub Bot commented on FLINK-10331:


pnowojski commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218324609
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ##
 @@ -179,10 +185,49 @@ public void 
testFlushWithUnfinishedBufferBehindFinished() throws Exception {
try {
subpartition.add(createFilledBufferConsumer(1025)); // 
finished

subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+   long oldNumNotifications = 
availablityListener.getNumNotifications();
subpartition.flush();
+   // buffer queue is > 1, should already be notified, no 
further notification necessary
+   assertThat(oldNumNotifications, greaterThan(0L));
+   assertEquals(oldNumNotifications, 
availablityListener.getNumNotifications());
 
assertNextBuffer(readView, 1025, true, 1, false, true);
assertNextBuffer(readView, 1024, false, 1, false, 
false);
+   assertNoNextBuffer(readView);
+   } finally {
+   subpartition.release();
+   }
+   }
+
+   /**
+* A flush call with a buffer size of 1 should always notify consumers 
(unless already flushed).
+*/
+   @Test
+   public void testFlushWithUnfinishedBufferBehindFinished2() throws 
Exception {
+   final ResultSubpartition subpartition = createSubpartition();
 
 Review comment:
   Extract those initialisations to setup/teardown/`@Rule`. This code block is 
duplicated couple of times:
   
   ```
final ResultSubpartition subpartition = createSubpartition();
AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
availablityListener.resetNotificationCounters();
   
(...)
} finally {
subpartition.release();
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618607#comment-16618607
 ] 

ASF GitHub Bot commented on FLINK-10331:


pnowojski commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218325893
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 ##
 @@ -198,6 +199,8 @@ public void testConsumptionWithMixedChannels() throws 
Exception {
private abstract static class Source {
 
abstract void addBufferConsumer(BufferConsumer bufferConsumer) 
throws Exception;
+
+   abstract void flush();
 
 Review comment:
   Why did you need to add this `flush`? What was wrong?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618606#comment-16618606
 ] 

ASF GitHub Bot commented on FLINK-10331:


pnowojski commented on a change in pull request #6692: [FLINK-10331][network] 
reduce unnecesary flushing
URL: https://github.com/apache/flink/pull/6692#discussion_r218323627
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
return Math.max(buffers.size(), 0);
}
 
+   @Override
+   public void flush() {
+   synchronized (buffers) {
+   if (buffers.isEmpty()) {
+   return;
+   }
+   if (!flushRequested) {
+   flushRequested = true; // set this before the 
notification!
+   if (buffers.size() == 1) {
 
 Review comment:
   this if check deserves an explanation, either here or in the java doc above.
   
   > If there is more then 1 buffer, when we were adding second one we have 
already notified the reader


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614128#comment-16614128
 ] 

ASF GitHub Bot commented on FLINK-10331:


NicoK opened a new pull request #6692: [FLINK-10331][network] reduce unnecesary 
flushing
URL: https://github.com/apache/flink/pull/6692
 
 
   ## What is the purpose of the change
   
   With the re-design of the record writer interaction with the 
result(sub)partitions, flush requests can currently pile up in these scenarios:
   - a previous flush request has not been completely handled yet and/or is 
still enqueued or
   - the network stack is still polling from this subpartition and doesn't need 
a new notification
   
   These lead to increased notifications in low latency settings (low output 
flusher intervals) which can be avoided.
   
   ## Brief change log
   
   - do not flush (again) in the scenarios mentioned above, relying on 
`flushRequested` and the `buffer` queue size
   - add intensive sanity checks to `SpillingAdaptiveSpanningRecordDeserializer`
   - several smaller improvement hotfixes (please see the individual commits)
   
   ## Verifying this change
   
   This change is already covered by existing tests plus a few new tests in 
`PipelinedSubpartitionTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **yes** 
(depending on output flusher interval, rather per buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **JavaDocs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unnecessary flush requests to the network stack
> ---
>
> Key: FLINK-10331
> URL: https://issues.apache.org/jira/browse/FLINK-10331
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the re-design of the record writer interaction with the 
> result(sub)partitions, flush requests can currently pile up in these 
> scenarios:
> - a previous flush request has not been completely handled yet and/or is 
> still enqueued or
> - the network stack is still polling from this subpartition and doesn't need 
> a new notification
> These lead to increased notifications in low latency settings (low output 
> flusher intervals) which can be avoided.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)