[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620434#comment-16620434 ] ASF GitHub Bot commented on FLINK-10331: NicoK closed pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 41ee03db259..8630acee9a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -485,7 +485,7 @@ private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRec } else { // collect in memory - ensureBufferCapacity(numBytesChunk); + ensureBufferCapacity(nextRecordLength); partial.segment.get(partial.position, buffer, 0, numBytesChunk); } @@ -502,8 +502,8 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in int segmentRemaining = numBytes; // check where to go. if we have a partial length, we need to complete it first if (this.lengthBuffer.position() > 0) { - int toPut = Math.min(this.lengthBuffer.remaining(), numBytes); - segment.get(offset, this.lengthBuffer, toPut); + int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining); + segment.get(segmentPosition, this.lengthBuffer, toPut); // did we complete the length? if (this.lengthBuffer.hasRemaining()) { return; @@ -515,6 +515,8 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in segmentRemaining -= toPut; if (this.recordLength > THRESHOLD_FOR_SPILLING) { this.spillingChannel = createSpillingChannel(); + } else { + ensureBufferCapacity(this.recordLength); } } } @@ -527,9 +529,7 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in // spill to file ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy); this.spillingChannel.write(toWrite); - } - else { - ensureBufferCapacity(accumulatedRecordBytes + toCopy); + } else { segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java index 305f1842911..6fb067ef8c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java @@ -104,9 +104,9 @@ public void commit() { * @return number of written bytes. */ public int finish() { - positionMarker.markFinished(); + int writtenBytes = positionMarker.markFinished(); commit(); - return getWrittenBytes(); + return writtenBytes; } public boolean isFinished() { @@ -118,18 +118,10 @@ public boolean isFull() { return positionMarker.getCached() == getMaxCapacity(); } - public boolean isEmpty() { - return positionMarker.getCached() == 0; - } - public int getMaxCapacity() { return memorySegment.size(); } -
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618854#comment-16618854 ] ASF GitHub Bot commented on FLINK-10331: NicoK commented on issue #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#issuecomment-422337715 thanks for the review - I integrated the changes (FYI: I already merged the additional comment to `PipelinedSubpartition` into the respective commit because that change is so small and no further squashing is now required) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618851#comment-16618851 ] ASF GitHub Bot commented on FLINK-10331: NicoK commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218375527 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ## @@ -179,10 +185,49 @@ public void testFlushWithUnfinishedBufferBehindFinished() throws Exception { try { subpartition.add(createFilledBufferConsumer(1025)); // finished subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + long oldNumNotifications = availablityListener.getNumNotifications(); subpartition.flush(); + // buffer queue is > 1, should already be notified, no further notification necessary + assertThat(oldNumNotifications, greaterThan(0L)); + assertEquals(oldNumNotifications, availablityListener.getNumNotifications()); assertNextBuffer(readView, 1025, true, 1, false, true); assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); + } finally { + subpartition.release(); + } + } + + /** +* A flush call with a buffer size of 1 should always notify consumers (unless already flushed). +*/ + @Test + public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); Review comment: ok, I forked off the methods with read view and availability listener to `PipelinedSubpartitionWithReadViewTest` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618775#comment-16618775 ] ASF GitHub Bot commented on FLINK-10331: NicoK commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218363450 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ## @@ -179,10 +185,49 @@ public void testFlushWithUnfinishedBufferBehindFinished() throws Exception { try { subpartition.add(createFilledBufferConsumer(1025)); // finished subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + long oldNumNotifications = availablityListener.getNumNotifications(); subpartition.flush(); + // buffer queue is > 1, should already be notified, no further notification necessary + assertThat(oldNumNotifications, greaterThan(0L)); + assertEquals(oldNumNotifications, availablityListener.getNumNotifications()); assertNextBuffer(readView, 1025, true, 1, false, true); assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); + } finally { + subpartition.release(); + } + } + + /** +* A flush call with a buffer size of 1 should always notify consumers (unless already flushed). +*/ + @Test + public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); Review comment: unfortunately, not every unit test works on the same setup - are you proposing to - instantiate these nonetheless and let those be unused in some tests, or - split the unit test into one with and one without this initialization? Or maybe I'm not aware of some trick that solves this... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618757#comment-16618757 ] ASF GitHub Bot commented on FLINK-10331: NicoK commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218357947 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { return Math.max(buffers.size(), 0); } + @Override + public void flush() { + synchronized (buffers) { + if (buffers.isEmpty()) { + return; + } + if (!flushRequested) { + flushRequested = true; // set this before the notification! + if (buffers.size() == 1) { Review comment: sure - I think, I explained the flushing behaviour in the class' JavaDoc > Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second {@link BufferConsumer} (in which case we will assume the first one finished), we will {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows no more buffers being available. But it doesn't hurt to have something small / more explicit here as well This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618755#comment-16618755 ] ASF GitHub Bot commented on FLINK-10331: NicoK commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218357947 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { return Math.max(buffers.size(), 0); } + @Override + public void flush() { + synchronized (buffers) { + if (buffers.isEmpty()) { + return; + } + if (!flushRequested) { + flushRequested = true; // set this before the notification! + if (buffers.size() == 1) { Review comment: sure - I think, I explained the flushing behaviour in the class' JavaDoc but it doesn't hurt to have something small here as well This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618756#comment-16618756 ] ASF GitHub Bot commented on FLINK-10331: NicoK commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218357947 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { return Math.max(buffers.size(), 0); } + @Override + public void flush() { + synchronized (buffers) { + if (buffers.isEmpty()) { + return; + } + if (!flushRequested) { + flushRequested = true; // set this before the notification! + if (buffers.size() == 1) { Review comment: sure - I think, I explained the flushing behaviour in the class' JavaDoc > Except by calling {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows no more buffers being available. But it doesn't hurt to have something small here as well This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618754#comment-16618754 ] ASF GitHub Bot commented on FLINK-10331: NicoK commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218357947 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { return Math.max(buffers.size(), 0); } + @Override + public void flush() { + synchronized (buffers) { + if (buffers.isEmpty()) { + return; + } + if (!flushRequested) { + flushRequested = true; // set this before the notification! + if (buffers.size() == 1) { Review comment: sure - I think, I explained the flushing behaviour in #6693 in the class' comment but it doesn't hurt to have something small here as well This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618751#comment-16618751 ] ASF GitHub Bot commented on FLINK-10331: NicoK commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218357314 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java ## @@ -198,6 +199,8 @@ public void testConsumptionWithMixedChannels() throws Exception { private abstract static class Source { abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception; + + abstract void flush(); Review comment: depending on the implementation in `PipelinedSubpartition`, i.e. `if (buffers.size() == 1 && buffers.peekLast().isFinished())` or whatever we change it to (we don't make guarantees here!), the producer thread may not have flushed its last record after finishing and the source would wait forever (no output flusher in that test) -> we need to flush all channels before leaving the producer This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618605#comment-16618605 ] ASF GitHub Bot commented on FLINK-10331: pnowojski commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218324609 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ## @@ -179,10 +185,49 @@ public void testFlushWithUnfinishedBufferBehindFinished() throws Exception { try { subpartition.add(createFilledBufferConsumer(1025)); // finished subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + long oldNumNotifications = availablityListener.getNumNotifications(); subpartition.flush(); + // buffer queue is > 1, should already be notified, no further notification necessary + assertThat(oldNumNotifications, greaterThan(0L)); + assertEquals(oldNumNotifications, availablityListener.getNumNotifications()); assertNextBuffer(readView, 1025, true, 1, false, true); assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); + } finally { + subpartition.release(); + } + } + + /** +* A flush call with a buffer size of 1 should always notify consumers (unless already flushed). +*/ + @Test + public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); Review comment: Extract those initialisations to setup/teardown/`@Rule`. This code block is duplicated couple of times: ``` final ResultSubpartition subpartition = createSubpartition(); AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); ResultSubpartitionView readView = subpartition.createReadView(availablityListener); availablityListener.resetNotificationCounters(); (...) } finally { subpartition.release(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618607#comment-16618607 ] ASF GitHub Bot commented on FLINK-10331: pnowojski commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218325893 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java ## @@ -198,6 +199,8 @@ public void testConsumptionWithMixedChannels() throws Exception { private abstract static class Source { abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception; + + abstract void flush(); Review comment: Why did you need to add this `flush`? What was wrong? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618606#comment-16618606 ] ASF GitHub Bot commented on FLINK-10331: pnowojski commented on a change in pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692#discussion_r218323627 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -279,6 +281,21 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { return Math.max(buffers.size(), 0); } + @Override + public void flush() { + synchronized (buffers) { + if (buffers.isEmpty()) { + return; + } + if (!flushRequested) { + flushRequested = true; // set this before the notification! + if (buffers.size() == 1) { Review comment: this if check deserves an explanation, either here or in the java doc above. > If there is more then 1 buffer, when we were adding second one we have already notified the reader This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10331) Fix unnecessary flush requests to the network stack
[ https://issues.apache.org/jira/browse/FLINK-10331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614128#comment-16614128 ] ASF GitHub Bot commented on FLINK-10331: NicoK opened a new pull request #6692: [FLINK-10331][network] reduce unnecesary flushing URL: https://github.com/apache/flink/pull/6692 ## What is the purpose of the change With the re-design of the record writer interaction with the result(sub)partitions, flush requests can currently pile up in these scenarios: - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification These lead to increased notifications in low latency settings (low output flusher intervals) which can be avoided. ## Brief change log - do not flush (again) in the scenarios mentioned above, relying on `flushRequested` and the `buffer` queue size - add intensive sanity checks to `SpillingAdaptiveSpanningRecordDeserializer` - several smaller improvement hotfixes (please see the individual commits) ## Verifying this change This change is already covered by existing tests plus a few new tests in `PipelinedSubpartitionTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** (depending on output flusher interval, rather per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix unnecessary flush requests to the network stack > --- > > Key: FLINK-10331 > URL: https://issues.apache.org/jira/browse/FLINK-10331 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the re-design of the record writer interaction with the > result(sub)partitions, flush requests can currently pile up in these > scenarios: > - a previous flush request has not been completely handled yet and/or is > still enqueued or > - the network stack is still polling from this subpartition and doesn't need > a new notification > These lead to increased notifications in low latency settings (low output > flusher intervals) which can be avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005)