[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4559 ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157709904 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- Yes, I totally agree with your point of current status of spillable/spilled subpartitions and subpartition views. And I also think that the `PipelinedSubpartition` is the most important path and the `SpillableSubpartition` should not be very sensitive. I think we already reach a consensus for the way of `SpillableSubpartition` and I will do for that later. :) ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157707628 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- yes, that would be nice ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157706995 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- Your absolutely right about not counting events . Therefore, we cannot use the queue's size as I suggested. Yes, `BufferAndAvailability` would need to be extended as well. This integration/split of the spillable/spilled subpartitions and subpartition views and both of them working on the same structures requiring the same synchronisation pattern is imho really not nice and highly fragile. @pnowojski and me are currently re-designing the synchronisation in these parts of the code and are a bit sensitive to it now so let's drag him into this discussion as well: I would consider `PipelinedSubpartition` the hot path where we need to optimise most - spillable subpartitions are used in batch mode and have higher tolerances, especially when spilling to disk. if you returned the new backlog counter in `SpillableSubpartition#decreaseBuffersInBacklog()` however (retrieved under the `synchronized (buffers)` section), then you would not need the `volatile` either since you are already under the lock. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157706951 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- Sorry my expression is not correct above. I mean we do not need `decreaseBuffersInBacklog` method in `ResultSubPartition` after modifying the `parent` as `SpillableSubpartition` in `SpilledSubpartitionView`. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157703075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- `package-private`, e.g. `abstract void increaseBuffersInBacklog(Buffer buffer);`, already works without changing anything since `SpilledSubpartitionView` is in the same package ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157694294 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception { try { subpartition.finish(); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- sure ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157693477 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws Exception { partition.add(buffer); partition.add(buffer); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + + assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) + // still same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + partition.finish(); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- sure ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157691096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- The way of `ArrayDeque#size()` for `getBuffersInBacklog()` may be not feasible because we do not know how many events in the `ArrayDeque` and they should not be considered as backlog length. For the new API, we may need to modify the `ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping structure instead of `Buffer`, and do we also need to extend the `BufferAndAvailability` to add backlog in it? By this way, it can get benefits for `PipelinedSubpartition` to reduce 'volatile`, but for `SpillableSubpartition`, the `volatile` may still be needed? Because the `getNextBuffer` and `decreaseBacklog` are in different parts for `SpillableSubpartitionView/SpilledSubpartitionView`. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157686388 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- The current `parent` in `SpilledSubpartitionView` is `ResultSubpartition` not `SpillableSubpartition`, after replacing the `ResultSubpartition` by `SpillableSubpartition`, we can make these methods package-private as you suggest. I will do that. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157544965 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -161,6 +172,29 @@ public boolean isReleased() { return isReleased; } + @Override + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + @Override + public void decreaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + } + + @Override + public void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } --- End diff -- please check the access-level (the latter two could be private) ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157548033 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- I shortly thought about relying on `buffers.size()` here to reduce complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may show some race conditions then without synchronisation. However, if we picked up the idea again of returning the backlog size with the buffer itself (which is retrieved under the lock), i.e. similar to `BufferAndAvailability` being returned by the `SequenceNumberingViewReader`, this would work and we would not need the `volatile` here. Since you split the implementations into `PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a viable approach again. What do you think? What would you prefer? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157540910 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception { try { subpartition.finish(); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- Actually, this never increases the backlog, even if the subpartition is not finished, since `buffer.isBuffer()` for a `mock(Buffer.class)` returns `false`. Can you test with a real `Buffer` instead? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157539147 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -239,6 +261,10 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception // Spill now assertEquals(2, partition.releaseMemory()); + // still same statistics: + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(2, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- same here - please add the checks to the `reader.getNextBuffer()` lines below ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157538061 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -103,16 +104,35 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { // Add data to the queue... subpartition.add(createBuffer()); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + // ...should have resulted in a notification verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result assertNotNull(view.getNextBuffer()); assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); // Add data to the queue... subpartition.add(createBuffer()); + + assertEquals(2, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + + // Add event to the queue... + Buffer event = createBuffer(); + event.tagAsEvent(); + subpartition.add(event); + + assertEquals(3, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); --- End diff -- good catch - the event-adding path was not tested yet ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157548895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -77,6 +78,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- If the interface of `getNextBuffer()` was changed as suggested above, we could remove the `volatile` here as well. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157541024 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -62,7 +70,14 @@ public void testAddAfterRelease() throws Exception { try { subpartition.release(); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- same here - please test with a real `Buffer` instance ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157545208 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -237,6 +243,29 @@ public boolean isReleased() { return isReleased; } + @Override + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + @Override + public void decreaseBuffersInBacklog(Buffer buffer) { + if (buffer != null && buffer.isBuffer()) { + synchronized (buffers) { + buffersInBacklog--; + } + } + } + + @Override + public void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } --- End diff -- please check the access-level (the latter two could be private) ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157538818 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws Exception { partition.add(buffer); partition.add(buffer); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + + assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) + // still same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + partition.finish(); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- good, can you also add the backlog correctness checks to the `reader.getNextBuffer()` lines below to ensure they are correct after taking buffers out? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157544794 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- I'm not quite sure the latter two methods should be in `ResultSubpartition` now since they are quite internal. `increaseBuffersInBacklog()` is only called by `PipelinedSubpartition` and `SpillableSubpartition`. `decreaseBuffersInBacklog()` is (additionally) only by spilled/spillable subpartition views and therefore could be package-private in `SpillableSubpartition` only. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r155458048 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, InterruptedException { listener.notifyBuffersAvailable(1); } + if (current.isBuffer()) { --- End diff -- I think the `decreaseStatistics` should be inside the `getNextBufferInternal`, otherwise the backlog value is not thread-safe. The previous implementation can make the 'decreaseStatistics` inside the synchronized part. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r155454935 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,57 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } + + /** +* Returns the next {@link Buffer} instance of this queue iterator and also +* decreases the related statistics. +*/ + public Buffer getNextBuffer() throws IOException, InterruptedException { --- End diff -- i think it makes sense. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r155454886 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java --- @@ -39,13 +39,15 @@ private final AtomicBoolean isReleased; PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) { + super(parent); + this.parent = checkNotNull(parent); this.availabilityListener = checkNotNull(listener); this.isReleased = new AtomicBoolean(); } @Override - public Buffer getNextBuffer() { + protected Buffer getNextBufferInternal() { --- End diff -- I will add the hotfix commit for it. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java --- @@ -39,13 +39,15 @@ private final AtomicBoolean isReleased; PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) { + super(parent); + this.parent = checkNotNull(parent); this.availabilityListener = checkNotNull(listener); this.isReleased = new AtomicBoolean(); } @Override - public Buffer getNextBuffer() { + protected Buffer getNextBufferInternal() { --- End diff -- Actually, a lot of the methods along these calls should probably be marked `@Nullable`. Since you touched the `getNextBufferInternal()`, can you at least mark this (and maybe some calls along the call stack if I say pretty please?). You can do so in a separate `[hotfix]` commit to keep this separate ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -133,7 +135,7 @@ int releaseMemory() throws IOException { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344924 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -114,7 +116,7 @@ public void onNotification() { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154344942 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java --- @@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T // - - static class InfiniteSubpartitionView implements ResultSubpartitionView { + static class InfiniteSubpartitionView extends ResultSubpartitionView { private final BufferProvider bufferProvider; private final CountDownLatch sync; public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) { + super(mock(ResultSubpartition.class)); + this.bufferProvider = checkNotNull(bufferProvider); this.sync = checkNotNull(sync); } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + protected Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- `@Nullable` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343587 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,57 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } + + /** +* Returns the next {@link Buffer} instance of this queue iterator and also +* decreases the related statistics. +*/ + public Buffer getNextBuffer() throws IOException, InterruptedException { --- End diff -- `@Nullable` and (I'm not sure whether @pnowojski agrees) maybe make this method `final` (any subclass should only override `getNextBufferInternal`)? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r154343656 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,57 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } + + /** +* Returns the next {@link Buffer} instance of this queue iterator and also +* decreases the related statistics. +*/ + public Buffer getNextBuffer() throws IOException, InterruptedException { + Buffer buffer = getNextBufferInternal(); + if (buffer != null) { + parent.decreaseStatistics(buffer); + } + return buffer; + } + + public int getBuffersInBacklog() { + return parent.getBuffersInBacklog(); + } /** -* Returns the next {@link Buffer} instance of this queue iterator. -* -* If there is currently no instance available, it will return null. +* The internal method used by {@link ResultSubpartitionView#getNextBuffer()} +* to return the next {@link Buffer} instance of this queue iterator. +* +* If there is currently no instance available, it will return null. * This might happen for example when a pipelined queue producer is slower * than the consumer or a spilled queue needs to read in more data. -* -* Important: The consumer has to make sure that each +* +* Important: The consumer has to make sure that each * buffer instance will eventually be recycled with {@link Buffer#recycle()} * after it has been consumed. */ - Buffer getNextBuffer() throws IOException, InterruptedException; - - void notifyBuffersAvailable(long buffers) throws IOException; + protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException; --- End diff -- `@Nullable` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java --- @@ -114,7 +116,7 @@ public void onNotification() { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564859 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java --- @@ -22,32 +22,52 @@ import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A view to consume a {@link ResultSubpartition} instance. */ -public interface ResultSubpartitionView { +public abstract class ResultSubpartitionView { + + /** The parent subpartition this view belongs to. */ + private final ResultSubpartition parent; + + public ResultSubpartitionView(ResultSubpartition parent) { + this.parent = checkNotNull(parent); + } /** * Returns the next {@link Buffer} instance of this queue iterator. -* -* If there is currently no instance available, it will return null. +* +* If there is currently no instance available, it will return null. * This might happen for example when a pipelined queue producer is slower * than the consumer or a spilled queue needs to read in more data. -* -* Important: The consumer has to make sure that each +* +* Important: The consumer has to make sure that each * buffer instance will eventually be recycled with {@link Buffer#recycle()} * after it has been consumed. */ - Buffer getNextBuffer() throws IOException, InterruptedException; + public Buffer getNextBuffer() throws IOException, InterruptedException { + Buffer buffer = getNextBufferInternal(); + if (buffer != null) { + parent.decreaseStatistics(buffer); + } + return buffer; + } + + public int getBuffersInBacklog() { + return parent.getBuffersInBacklog(); + } - void notifyBuffersAvailable(long buffers) throws IOException; + protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException; --- End diff -- please add a javadoc with the intended relation to `getNextBuffer` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153563915 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java --- @@ -39,13 +39,15 @@ private final AtomicBoolean isReleased; PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) { + super(parent); + this.parent = checkNotNull(parent); this.availabilityListener = checkNotNull(listener); this.isReleased = new AtomicBoolean(); } @Override - public Buffer getNextBuffer() { + public Buffer getNextBufferInternal() { --- End diff -- make this `protected` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564111 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java --- @@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T // - - static class InfiniteSubpartitionView implements ResultSubpartitionView { + static class InfiniteSubpartitionView extends ResultSubpartitionView { private final BufferProvider bufferProvider; private final CountDownLatch sync; public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) { + super(mock(ResultSubpartition.class)); + this.bufferProvider = checkNotNull(bufferProvider); this.sync = checkNotNull(sync); } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r153564062 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -133,7 +135,7 @@ int releaseMemory() throws IOException { } @Override - public Buffer getNextBuffer() throws IOException, InterruptedException { + public Buffer getNextBufferInternal() throws IOException, InterruptedException { --- End diff -- make this `protected` ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r152193809 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, InterruptedException { listener.notifyBuffersAvailable(1); } + if (current.isBuffer()) { --- End diff -- That is a good idea, and I already addressed this issue as you suggested. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r152008483 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -246,9 +246,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { @Override public String toString() { return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + - "finished? %s, read view? %s, spilled? %s]", - getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, - spillWriter != null); + "%d backlog, finished? %s, read view? %s, spilled? %s]", --- End diff -- `"%d buffers in backlog, finished (...)"` ? `"backlog = %d, finished (...)"` ? `"%d in backlog, finished (...)"` ? `%d backlog` is a little bit cryptic. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r152008221 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, InterruptedException { listener.notifyBuffersAvailable(1); } + if (current.isBuffer()) { --- End diff -- This logic is a copy/paste with `SpilledSubpartitionView` and `PipelinedSubpartition`. It gets even more complicated in next PR. How about changing `ResultSubpartitionView` to an abstract class with `ResultSubpartition parent` field and following methods: ``` Buffer getNextBuffer() throws IOException, InterruptedException { Buffer next = getNextBufferInternal(); if (buffer != null) { parent.decreaseStatistics(buffer); } return next; } protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException; ``` And rename all current implementations of `getNextBuffer` to `getNextBufferInternal`. Thus: 1. You wouldn't have to reimplement and handle decrementing in many places, but only one 2. `protected int backlog;` field from `ResultSubpartition` could be made private. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r152008524 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -246,9 +246,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { @Override public String toString() { return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," + - "finished? %s, read view? %s, spilled? %s]", - getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null, - spillWriter != null); + "%d backlog, finished? %s, read view? %s, spilled? %s]", + getTotalNumberOfBuffers(), getTotalNumberOfBytes(), + backlog, isFinished, readView != null, spillWriter != null); --- End diff -- `getBacklog`? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r152010406 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -41,12 +41,19 @@ /** The total number of bytes (both data and event buffers) */ private long totalNumberOfBytes; + /** The number of non-event buffers currently in this subpartition */ + protected int backlog; --- End diff -- rename `backlog` to `buffersInBacklog`/`accumulatedBuffers`/`backloggedBuffers`? ---