[hotfix] only update buffer statistics in SpillableSubpartition#add() if 
successful


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79bcdffc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79bcdffc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79bcdffc

Branch: refs/heads/master
Commit: 79bcdffc057d366f31860d7690abac2819d84bd1
Parents: 7fa3b55
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Nov 23 13:09:37 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Jan 5 14:59:52 2018 +0100

----------------------------------------------------------------------
 .../io/network/partition/SpillableSubpartition.java    | 13 ++++++++-----
 .../network/partition/PipelinedSubpartitionTest.java   |  5 +++++
 .../network/partition/SpillableSubpartitionTest.java   | 13 +++++++++++++
 .../io/network/partition/SubpartitionTestBase.java     | 10 ++++++++++
 4 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 654d528..065de8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -95,13 +95,12 @@ class SpillableSubpartition extends ResultSubpartition {
                                return false;
                        }
 
-                       // The number of buffers are needed later when creating
-                       // the read views. If you ever remove this line here,
-                       // make sure to still count the number of buffers.
-                       updateStatistics(buffer);
-
                        if (spillWriter == null) {
                                buffers.add(buffer);
+                               // The number of buffers are needed later when 
creating
+                               // the read views. If you ever remove this line 
here,
+                               // make sure to still count the number of 
buffers.
+                               updateStatistics(buffer);
 
                                return true;
                        }
@@ -109,6 +108,10 @@ class SpillableSubpartition extends ResultSubpartition {
 
                // Didn't return early => go to disk
                spillWriter.writeBlock(buffer);
+               synchronized (buffers) {
+                       // See the note above, but only do this if the buffer 
was correctly added!
+                       updateStatistics(buffer);
+               }
 
                return true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index de1e8a0..6d36aa6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -102,6 +103,8 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                // Add data to the queue...
                subpartition.add(createBuffer());
+               assertEquals(1, subpartition.getTotalNumberOfBuffers());
+               assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
 
                // ...should have resulted in a notification
                verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
@@ -112,6 +115,8 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
 
                // Add data to the queue...
                subpartition.add(createBuffer());
+               assertEquals(2, subpartition.getTotalNumberOfBuffers());
+               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
                verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 3b5c49c..05a364d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -184,13 +184,21 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                partition.add(buffer);
                partition.add(buffer);
                partition.add(buffer);
+               assertEquals(3, partition.getTotalNumberOfBuffers());
+               assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
 
                assertFalse(buffer.isRecycled());
                assertEquals(3, partition.releaseMemory());
                // now the buffer may be freed, depending on the timing of the 
write operation
                // -> let's do this check at the end of the test (to save some 
time)
+               // stil same statistics
+               assertEquals(3, partition.getTotalNumberOfBuffers());
+               assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
 
                partition.finish();
+               // + one EndOfPartitionEvent
+               assertEquals(4, partition.getTotalNumberOfBuffers());
+               assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
                BufferAvailabilityListener listener = spy(new 
AwaitableBufferAvailablityListener());
                SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
@@ -250,6 +258,8 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                partition.add(buffer);
                partition.add(buffer);
                partition.finish();
+               assertEquals(4, partition.getTotalNumberOfBuffers());
+               assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
                AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
                SpillableSubpartitionView reader = (SpillableSubpartitionView) 
partition.createReadView(listener);
@@ -267,6 +277,9 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                // Spill now
                assertEquals(2, partition.releaseMemory());
                assertFalse(buffer.isRecycled()); // still one in the reader!
+               // still same statistics:
+               assertEquals(4, partition.getTotalNumberOfBuffers());
+               assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
 
                listener.awaitNotifications(4, 30_000);
                assertEquals(4, listener.getNumNotifiedBuffers());

http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 800542e..d084f62 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -21,8 +21,10 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -46,8 +48,12 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
 
                try {
                        subpartition.finish();
+                       assertEquals(1, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(4, subpartition.getTotalNumberOfBytes());
 
                        assertFalse(subpartition.add(mock(Buffer.class)));
+                       assertEquals(1, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(4, subpartition.getTotalNumberOfBytes());
                } finally {
                        if (subpartition != null) {
                                subpartition.release();
@@ -61,8 +67,12 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
 
                try {
                        subpartition.release();
+                       assertEquals(0, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(0, subpartition.getTotalNumberOfBytes());
 
                        assertFalse(subpartition.add(mock(Buffer.class)));
+                       assertEquals(0, subpartition.getTotalNumberOfBuffers());
+                       assertEquals(0, subpartition.getTotalNumberOfBytes());
                } finally {
                        if (subpartition != null) {
                                subpartition.release();

Reply via email to