[hotfix] only update buffer statistics in SpillableSubpartition#add() if successful
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79bcdffc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79bcdffc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79bcdffc Branch: refs/heads/master Commit: 79bcdffc057d366f31860d7690abac2819d84bd1 Parents: 7fa3b55 Author: Nico Kruber <n...@data-artisans.com> Authored: Thu Nov 23 13:09:37 2017 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Fri Jan 5 14:59:52 2018 +0100 ---------------------------------------------------------------------- .../io/network/partition/SpillableSubpartition.java | 13 ++++++++----- .../network/partition/PipelinedSubpartitionTest.java | 5 +++++ .../network/partition/SpillableSubpartitionTest.java | 13 +++++++++++++ .../io/network/partition/SubpartitionTestBase.java | 10 ++++++++++ 4 files changed, 36 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 654d528..065de8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -95,13 +95,12 @@ class SpillableSubpartition extends ResultSubpartition { return false; } - // The number of buffers are needed later when creating - // the read views. If you ever remove this line here, - // make sure to still count the number of buffers. - updateStatistics(buffer); - if (spillWriter == null) { buffers.add(buffer); + // The number of buffers are needed later when creating + // the read views. If you ever remove this line here, + // make sure to still count the number of buffers. + updateStatistics(buffer); return true; } @@ -109,6 +108,10 @@ class SpillableSubpartition extends ResultSubpartition { // Didn't return early => go to disk spillWriter.writeBlock(buffer); + synchronized (buffers) { + // See the note above, but only do this if the buffer was correctly added! + updateStatistics(buffer); + } return true; } http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index de1e8a0..6d36aa6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -102,6 +103,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { // Add data to the queue... subpartition.add(createBuffer()); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // ...should have resulted in a notification verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); @@ -112,6 +115,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { // Add data to the queue... subpartition.add(createBuffer()); + assertEquals(2, subpartition.getTotalNumberOfBuffers()); + assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); } http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index 3b5c49c..05a364d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -184,13 +184,21 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { partition.add(buffer); partition.add(buffer); partition.add(buffer); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); // now the buffer may be freed, depending on the timing of the write operation // -> let's do this check at the end of the test (to save some time) + // stil same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); partition.finish(); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); BufferAvailabilityListener listener = spy(new AwaitableBufferAvailablityListener()); SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener); @@ -250,6 +258,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { partition.add(buffer); partition.add(buffer); partition.finish(); + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(listener); @@ -267,6 +277,9 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { // Spill now assertEquals(2, partition.releaseMemory()); assertFalse(buffer.isRecycled()); // still one in the reader! + // still same statistics: + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); listener.awaitNotifications(4, 30_000); assertEquals(4, listener.getNumNotifiedBuffers()); http://git-wip-us.apache.org/repos/asf/flink/blob/79bcdffc/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 800542e..d084f62 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -21,8 +21,10 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.util.TestLogger; + import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -46,8 +48,12 @@ public abstract class SubpartitionTestBase extends TestLogger { try { subpartition.finish(); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); } finally { if (subpartition != null) { subpartition.release(); @@ -61,8 +67,12 @@ public abstract class SubpartitionTestBase extends TestLogger { try { subpartition.release(); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); } finally { if (subpartition != null) { subpartition.release();