[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2018-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4559


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157709904
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

Yes, I totally agree with your point of current status of spillable/spilled 
subpartitions and subpartition views.

And I also think that the `PipelinedSubpartition` is the most important 
path and the `SpillableSubpartition` should not be very sensitive. I think we 
already reach a consensus for the way of `SpillableSubpartition` and I will do 
for that later. :)


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157707628
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

yes, that would be nice


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157706995
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

Your absolutely right about not counting events . Therefore, we cannot use 
the queue's size  as I suggested.

Yes, `BufferAndAvailability` would need to be extended as well.

This integration/split of the spillable/spilled subpartitions and 
subpartition views and both of them working on the same structures requiring 
the same synchronisation pattern is imho really not nice and highly fragile. 
@pnowojski and me are currently re-designing the synchronisation in these parts 
of the code and are a bit sensitive to it now so let's drag him into this 
discussion as well: I would consider `PipelinedSubpartition` the hot path where 
we need to optimise most - spillable subpartitions are used in batch mode and 
have higher tolerances, especially when spilling to disk. if you returned the 
new backlog counter in `SpillableSubpartition#decreaseBuffersInBacklog()` 
however (retrieved under the `synchronized (buffers)` section), then you would 
not need the `volatile` either since you are already under the lock.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157706951
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

Sorry my expression is not correct above. I mean we do not need 
`decreaseBuffersInBacklog` method in `ResultSubPartition` after modifying the 
`parent` as `SpillableSubpartition` in  `SpilledSubpartitionView`.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157703075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

`package-private`, e.g. `abstract void increaseBuffersInBacklog(Buffer 
buffer);`, already works without changing anything since 
`SpilledSubpartitionView` is in the same package


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157694294
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
try {
subpartition.finish();
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(4, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

sure


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157693477
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws 
Exception {
partition.add(buffer);
partition.add(buffer);
 
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
+   assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
 
+   // now the buffer may be freed, depending on the timing of the 
write operation
+   // -> let's do this check at the end of the test (to save some 
time)
+   // still same statistics
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
partition.finish();
 
+   // + one EndOfPartitionEvent
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

sure


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157691096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

The way of  `ArrayDeque#size()` for `getBuffersInBacklog()` may be not 
feasible because we do not know how many events in the `ArrayDeque` and they 
should not be considered as backlog length.

For the new API, we may need to modify the 
`ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping 
structure instead of `Buffer`, and do we also need to extend the 
`BufferAndAvailability` to add backlog in it?  By this way, it can get benefits 
for `PipelinedSubpartition` to reduce 'volatile`, but for 
`SpillableSubpartition`, the `volatile` may still be needed? Because the 
`getNextBuffer` and `decreaseBacklog` are in different parts for 
`SpillableSubpartitionView/SpilledSubpartitionView`.



---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157686388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

The current `parent` in `SpilledSubpartitionView` is `ResultSubpartition` 
not `SpillableSubpartition`, after replacing the `ResultSubpartition` by 
`SpillableSubpartition`, we can make these methods package-private as you 
suggest. I will do that.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157544965
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -161,6 +172,29 @@ public boolean isReleased() {
return isReleased;
}
 
+   @Override
+   public int getBuffersInBacklog() {
+   return buffersInBacklog;
+   }
+
+   @Override
+   public void decreaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog--;
+   }
+   }
+
+   @Override
+   public void increaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog++;
+   }
+   }
--- End diff --

please check the access-level (the latter two could be private)


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157548033
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

I shortly thought about relying on `buffers.size()` here to reduce 
complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may 
show some race conditions then without synchronisation. However, if we picked 
up the idea again of returning the backlog size with the buffer itself (which 
is retrieved under the lock), i.e. similar to `BufferAndAvailability` being 
returned by the `SequenceNumberingViewReader`, this would work and we would not 
need the `volatile` here. Since you split the implementations into 
`PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a 
viable approach again.
What do you think? What would you prefer?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157540910
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
try {
subpartition.finish();
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(4, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

Actually, this never increases the backlog, even if the subpartition is not 
finished, since `buffer.isBuffer()` for a `mock(Buffer.class)` returns `false`. 
Can you test with a real `Buffer` instead?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157539147
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -239,6 +261,10 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
 
// Spill now
assertEquals(2, partition.releaseMemory());
+   // still same statistics:
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(2, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

same here - please add the checks to the `reader.getNextBuffer()` lines 
below


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157538061
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -103,16 +104,35 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
// Add data to the queue...
subpartition.add(createBuffer());
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+
// ...should have resulted in a notification
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
assertNotNull(view.getNextBuffer());
assertNull(view.getNextBuffer());
+   assertEquals(0, subpartition.getBuffersInBacklog());
 
// Add data to the queue...
subpartition.add(createBuffer());
+
+   assertEquals(2, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
+
+   // Add event to the queue...
+   Buffer event = createBuffer();
+   event.tagAsEvent();
+   subpartition.add(event);
+
+   assertEquals(3, subpartition.getTotalNumberOfBuffers());
+   assertEquals(1, subpartition.getBuffersInBacklog());
--- End diff --

good catch - the event-adding path was not tested yet


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157548895
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -77,6 +78,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

If the interface of `getNextBuffer()` was changed as suggested above, we 
could remove the `volatile` here as well.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157541024
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -62,7 +70,14 @@ public void testAddAfterRelease() throws Exception {
try {
subpartition.release();
 
+   assertEquals(0, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(0, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(0, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

same here - please test with a real `Buffer` instance


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157545208
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -237,6 +243,29 @@ public boolean isReleased() {
return isReleased;
}
 
+   @Override
+   public int getBuffersInBacklog() {
+   return buffersInBacklog;
+   }
+
+   @Override
+   public void decreaseBuffersInBacklog(Buffer buffer) {
+   if (buffer != null && buffer.isBuffer()) {
+   synchronized (buffers) {
+   buffersInBacklog--;
+   }
+   }
+   }
+
+   @Override
+   public void increaseBuffersInBacklog(Buffer buffer) {
+   assert Thread.holdsLock(buffers);
+
+   if (buffer != null && buffer.isBuffer()) {
+   buffersInBacklog++;
+   }
+   }
--- End diff --

please check the access-level (the latter two could be private)


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157538818
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws 
Exception {
partition.add(buffer);
partition.add(buffer);
 
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
+   assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
 
+   // now the buffer may be freed, depending on the timing of the 
write operation
+   // -> let's do this check at the end of the test (to save some 
time)
+   // still same statistics
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
partition.finish();
 
+   // + one EndOfPartitionEvent
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

good, can you also add the backlog correctness checks to the 
`reader.getNextBuffer()` lines below to ensure they are correct after taking 
buffers out?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-18 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157544794
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

I'm not quite sure the latter two methods should be in `ResultSubpartition` 
now since they are quite internal. `increaseBuffersInBacklog()` is only called 
by `PipelinedSubpartition` and `SpillableSubpartition`. 
`decreaseBuffersInBacklog()` is (additionally) only by spilled/spillable 
subpartition views and therefore could be package-private in 
`SpillableSubpartition` only.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-07 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r155458048
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, 
InterruptedException {
listener.notifyBuffersAvailable(1);
}
 
+   if (current.isBuffer()) {
--- End diff --

I think the `decreaseStatistics` should be inside the 
`getNextBufferInternal`, otherwise the backlog value is not thread-safe. The 
previous implementation can make the 'decreaseStatistics` inside the 
synchronized part.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-07 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r155454935
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,57 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
+
+   /**
+* Returns the next {@link Buffer} instance of this queue iterator and 
also
+* decreases the related statistics.
+*/
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
--- End diff --

i think it makes sense.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-07 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r155454886
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 ---
@@ -39,13 +39,15 @@
private final AtomicBoolean isReleased;
 
PipelinedSubpartitionView(PipelinedSubpartition parent, 
BufferAvailabilityListener listener) {
+   super(parent);
+
this.parent = checkNotNull(parent);
this.availabilityListener = checkNotNull(listener);
this.isReleased = new AtomicBoolean();
}
 
@Override
-   public Buffer getNextBuffer() {
+   protected Buffer getNextBufferInternal() {
--- End diff --

I will add the hotfix commit for it.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 ---
@@ -39,13 +39,15 @@
private final AtomicBoolean isReleased;
 
PipelinedSubpartitionView(PipelinedSubpartition parent, 
BufferAvailabilityListener listener) {
+   super(parent);
+
this.parent = checkNotNull(parent);
this.availabilityListener = checkNotNull(listener);
this.isReleased = new AtomicBoolean();
}
 
@Override
-   public Buffer getNextBuffer() {
+   protected Buffer getNextBufferInternal() {
--- End diff --

Actually, a lot of the methods along these calls should probably be marked 
`@Nullable`. Since you touched the `getNextBufferInternal()`, can you at least 
mark this (and maybe some calls along the call stack if I say pretty please?). 
You can do so in a separate `[hotfix]` commit to keep this separate


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -133,7 +135,7 @@ int releaseMemory() throws IOException {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344924
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -114,7 +116,7 @@ public void onNotification() {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154344942
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 ---
@@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock 
invocationOnMock) throws T
 
// 
-
 
-   static class InfiniteSubpartitionView implements ResultSubpartitionView 
{
+   static class InfiniteSubpartitionView extends ResultSubpartitionView {
 
private final BufferProvider bufferProvider;
 
private final CountDownLatch sync;
 
public InfiniteSubpartitionView(BufferProvider bufferProvider, 
CountDownLatch sync) {
+   super(mock(ResultSubpartition.class));
+
this.bufferProvider = checkNotNull(bufferProvider);
this.sync = checkNotNull(sync);
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, 
InterruptedException {
+   protected Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

`@Nullable`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343587
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,57 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
+
+   /**
+* Returns the next {@link Buffer} instance of this queue iterator and 
also
+* decreases the related statistics.
+*/
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
--- End diff --

`@Nullable` and (I'm not sure whether @pnowojski agrees) maybe make this 
method `final` (any subclass should  only override `getNextBufferInternal`)?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-01 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r154343656
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,57 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
+
+   /**
+* Returns the next {@link Buffer} instance of this queue iterator and 
also
+* decreases the related statistics.
+*/
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   Buffer buffer = getNextBufferInternal();
+   if (buffer != null) {
+   parent.decreaseStatistics(buffer);
+   }
+   return buffer;
+   }
+
+   public int getBuffersInBacklog() {
+   return parent.getBuffersInBacklog();
+   }
 
/**
-* Returns the next {@link Buffer} instance of this queue iterator.
-* 
-* If there is currently no instance available, it will return 
null.
+* The internal method used by {@link 
ResultSubpartitionView#getNextBuffer()}
+* to return the next {@link Buffer} instance of this queue iterator.
+*
+* If there is currently no instance available, it will return 
null.
 * This might happen for example when a pipelined queue producer is 
slower
 * than the consumer or a spilled queue needs to read in more data.
-* 
-* Important: The consumer has to make sure that each
+*
+* Important: The consumer has to make sure that 
each
 * buffer instance will eventually be recycled with {@link 
Buffer#recycle()}
 * after it has been consumed.
 */
-   Buffer getNextBuffer() throws IOException, InterruptedException;
-
-   void notifyBuffersAvailable(long buffers) throws IOException;
+   protected abstract Buffer getNextBufferInternal() throws IOException, 
InterruptedException;
--- End diff --

`@Nullable`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564080
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 ---
@@ -114,7 +116,7 @@ public void onNotification() {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564859
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 ---
@@ -22,32 +22,52 @@
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A view to consume a {@link ResultSubpartition} instance.
  */
-public interface ResultSubpartitionView {
+public abstract class ResultSubpartitionView {
+
+   /** The parent subpartition this view belongs to. */
+   private final ResultSubpartition parent;
+
+   public ResultSubpartitionView(ResultSubpartition parent) {
+   this.parent = checkNotNull(parent);
+   }
 
/**
 * Returns the next {@link Buffer} instance of this queue iterator.
-* 
-* If there is currently no instance available, it will return 
null.
+*
+* If there is currently no instance available, it will return 
null.
 * This might happen for example when a pipelined queue producer is 
slower
 * than the consumer or a spilled queue needs to read in more data.
-* 
-* Important: The consumer has to make sure that each
+*
+* Important: The consumer has to make sure that 
each
 * buffer instance will eventually be recycled with {@link 
Buffer#recycle()}
 * after it has been consumed.
 */
-   Buffer getNextBuffer() throws IOException, InterruptedException;
+   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   Buffer buffer = getNextBufferInternal();
+   if (buffer != null) {
+   parent.decreaseStatistics(buffer);
+   }
+   return buffer;
+   }
+
+   public int getBuffersInBacklog() {
+   return parent.getBuffersInBacklog();
+   }
 
-   void notifyBuffersAvailable(long buffers) throws IOException;
+   protected abstract Buffer getNextBufferInternal() throws IOException, 
InterruptedException;
--- End diff --

please add a javadoc with the intended relation to `getNextBuffer`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153563915
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 ---
@@ -39,13 +39,15 @@
private final AtomicBoolean isReleased;
 
PipelinedSubpartitionView(PipelinedSubpartition parent, 
BufferAvailabilityListener listener) {
+   super(parent);
+
this.parent = checkNotNull(parent);
this.availabilityListener = checkNotNull(listener);
this.isReleased = new AtomicBoolean();
}
 
@Override
-   public Buffer getNextBuffer() {
+   public Buffer getNextBufferInternal() {
--- End diff --

make this `protected`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564111
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 ---
@@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock 
invocationOnMock) throws T
 
// 
-
 
-   static class InfiniteSubpartitionView implements ResultSubpartitionView 
{
+   static class InfiniteSubpartitionView extends ResultSubpartitionView {
 
private final BufferProvider bufferProvider;
 
private final CountDownLatch sync;
 
public InfiniteSubpartitionView(BufferProvider bufferProvider, 
CountDownLatch sync) {
+   super(mock(ResultSubpartition.class));
+
this.bufferProvider = checkNotNull(bufferProvider);
this.sync = checkNotNull(sync);
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, 
InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-28 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r153564062
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -133,7 +135,7 @@ int releaseMemory() throws IOException {
}
 
@Override
-   public Buffer getNextBuffer() throws IOException, InterruptedException {
+   public Buffer getNextBufferInternal() throws IOException, 
InterruptedException {
--- End diff --

make this `protected`


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-20 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r152193809
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, 
InterruptedException {
listener.notifyBuffersAvailable(1);
}
 
+   if (current.isBuffer()) {
--- End diff --

That is a good idea, and I already addressed this issue as you suggested.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-20 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r152008483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -246,9 +246,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
@Override
public String toString() {
return String.format("SpillableSubpartition [%d number of 
buffers (%d bytes)," +
-   "finished? %s, read view? %s, 
spilled? %s]",
-   getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(), isFinished, readView != null,
-   spillWriter != null);
+   "%d backlog, finished? %s, read view? %s, 
spilled? %s]",
--- End diff --

`"%d buffers in backlog, finished (...)"` ? 
`"backlog = %d, finished (...)"` ? 
`"%d in backlog, finished (...)"` ? 

`%d backlog` is a little bit cryptic.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-20 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r152008221
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, 
InterruptedException {
listener.notifyBuffersAvailable(1);
}
 
+   if (current.isBuffer()) {
--- End diff --

This logic is a copy/paste with `SpilledSubpartitionView` and 
`PipelinedSubpartition`. It gets even more complicated in next PR. 

How about changing `ResultSubpartitionView` to an abstract class with 
`ResultSubpartition parent` field and following methods:
```
Buffer getNextBuffer() throws IOException, InterruptedException {
Buffer next = getNextBufferInternal();
if (buffer != null) {
parent.decreaseStatistics(buffer);
}
return next;
}

protected abstract Buffer getNextBufferInternal() throws IOException, 
InterruptedException;
``` 
And rename all current implementations of `getNextBuffer` to 
`getNextBufferInternal`.

Thus:
1. You wouldn't have to reimplement and handle decrementing in many places, 
but only one
2. `protected int backlog;` field from `ResultSubpartition` could be made 
private.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-20 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r152008524
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -246,9 +246,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
@Override
public String toString() {
return String.format("SpillableSubpartition [%d number of 
buffers (%d bytes)," +
-   "finished? %s, read view? %s, 
spilled? %s]",
-   getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(), isFinished, readView != null,
-   spillWriter != null);
+   "%d backlog, finished? %s, read view? %s, 
spilled? %s]",
+   getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
+   backlog, isFinished, readView != null, spillWriter != 
null);
--- End diff --

`getBacklog`?


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-11-20 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r152010406
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -41,12 +41,19 @@
/** The total number of bytes (both data and event buffers) */
private long totalNumberOfBytes;
 
+   /** The number of non-event buffers currently in this subpartition */
+   protected int backlog;
--- End diff --

rename `backlog` to 
`buffersInBacklog`/`accumulatedBuffers`/`backloggedBuffers`? 


---