[FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the 
buffer in case of failures

This fixes a double-recycle in SpillableSubpartitionView and also makes sure
that even if adding the (asynchronous) write operation fails, the buffer is
properly freed in code that did not perform this cleanup. It avoids code
duplication of this cleanup and it is also more consistent to take over
responsibility of the given buffer even if an exception is thrown.

[FLINK-7499][io] complete the idiom of ResultSubpartition#add() taking over 
ownership of the buffer

The buffer will now always be released once and at the right time and the caller
must not worry about the buffer release if a called function threw an exception.

This closes #4581.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/622daa44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/622daa44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/622daa44

Branch: refs/heads/master
Commit: 622daa447755b984644212f56c5540253a10c149
Parents: 79bcdff
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Nov 23 14:58:21 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Jan 5 15:18:25 2018 +0100

----------------------------------------------------------------------
 .../iomanager/AsynchronousBufferFileWriter.java |  20 +-
 .../BlockChannelWriterWithCallback.java         |   4 +-
 .../partition/PipelinedSubpartition.java        |   1 +
 .../io/network/partition/ResultPartition.java   |  12 +-
 .../network/partition/ResultSubpartition.java   |  12 +
 .../partition/SpillableSubpartition.java        |  16 +-
 .../partition/SpillableSubpartitionView.java    |   6 +-
 .../AsynchronousBufferFileWriterTest.java       |  30 +++
 .../IOManagerAsyncWithNoOpBufferFileWriter.java |  53 ++++
 .../network/partition/ResultPartitionTest.java  |  63 ++++-
 .../partition/SpillableSubpartitionTest.java    | 252 ++++++++++++++++++-
 .../partition/SpilledSubpartitionViewTest.java  |   4 +
 12 files changed, 449 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
index 14bb8f7..9a78d0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.event.NotificationListener;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 
@@ -31,9 +32,26 @@ public class AsynchronousBufferFileWriter extends 
AsynchronousFileIOChannel<Buff
                super(channelID, requestQueue, CALLBACK, true);
        }
 
+       /**
+        * Writes the given block asynchronously.
+        *
+        * @param buffer
+        *              the buffer to be written (will be recycled when done)
+        *
+        * @throws IOException
+        *              thrown if adding the write operation fails
+        */
        @Override
        public void writeBlock(Buffer buffer) throws IOException {
-               addRequest(new BufferWriteRequest(this, buffer));
+               try {
+                       // if successfully added, the buffer will be recycled 
after the write operation
+                       addRequest(new BufferWriteRequest(this, buffer));
+               } catch (Throwable e) {
+                       // if not added, we need to recycle here
+                       buffer.recycle();
+                       ExceptionUtils.rethrowIOException(e);
+               }
+
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
index f7618e4..5738787 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -26,8 +26,8 @@ public interface BlockChannelWriterWithCallback<T> extends 
FileIOChannel {
         * Writes the given block. The request may be executed synchronously, 
or asynchronously, depending
         * on the implementation.
         *
-        * @param block The segment to be written.
+        * @param block The segment to be written (transferring ownership to 
this writer).
         * @throws IOException Thrown, when the writer encounters an I/O error.
         */
        void writeBlock(T block) throws IOException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index ed72b51..c1d6f13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -67,6 +67,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
                synchronized (buffers) {
                        if (isFinished || isReleased) {
+                               buffer.recycle();
                                return false;
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index be050b3..ea2cca5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -265,6 +265,7 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
 
        @Override
        public void writeBuffer(Buffer buffer, int subpartitionIndex) throws 
IOException {
+               checkNotNull(buffer);
                boolean success = false;
 
                try {
@@ -272,6 +273,8 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
 
                        final ResultSubpartition subpartition = 
subpartitions[subpartitionIndex];
 
+                       // retain for buffer use after add() but also to have a 
simple path for recycle()
+                       buffer.retain();
                        synchronized (subpartition) {
                                success = subpartition.add(buffer);
 
@@ -279,14 +282,11 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                                totalNumberOfBuffers++;
                                totalNumberOfBytes += buffer.getSize();
                        }
-               }
-               finally {
+               } finally {
                        if (success) {
                                notifyPipelinedConsumers();
                        }
-                       else {
-                               buffer.recycle();
-                       }
+                       buffer.recycle();
                }
        }
 
@@ -462,7 +462,7 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
 
        // 
------------------------------------------------------------------------
 
-       private void checkInProduceState() {
+       private void checkInProduceState() throws IllegalStateException {
                checkState(!isFinished, "Partition already finished.");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 3b4e3c9..e73082a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -70,6 +70,18 @@ public abstract class ResultSubpartition {
                return parent.getFailureCause();
        }
 
+       /**
+        * Adds the given buffer.
+        *
+        * <p>The request may be executed synchronously, or asynchronously, 
depending on the
+        * implementation.
+        *
+        * @param buffer
+        *              the buffer to add (transferring ownership to this 
writer)
+        *
+        * @throws IOException
+        *              thrown in case of errors while adding the buffer
+        */
        abstract public boolean add(Buffer buffer) throws IOException;
 
        abstract public void finish() throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 065de8e..4a8e165 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -92,6 +92,7 @@ class SpillableSubpartition extends ResultSubpartition {
 
                synchronized (buffers) {
                        if (isFinished || isReleased) {
+                               buffer.recycle();
                                return false;
                        }
 
@@ -107,10 +108,15 @@ class SpillableSubpartition extends ResultSubpartition {
                }
 
                // Didn't return early => go to disk
-               spillWriter.writeBlock(buffer);
-               synchronized (buffers) {
-                       // See the note above, but only do this if the buffer 
was correctly added!
-                       updateStatistics(buffer);
+               try {
+                       // retain buffer for updateStatistics() below
+                       spillWriter.writeBlock(buffer.retain());
+                       synchronized (buffers) {
+                               // See the note above, but only do this if the 
buffer was correctly added!
+                               updateStatistics(buffer);
+                       }
+               } finally {
+                       buffer.recycle();
                }
 
                return true;
@@ -207,7 +213,7 @@ class SpillableSubpartition extends ResultSubpartition {
                        ResultSubpartitionView view = readView;
 
                        if (view != null && view.getClass() == 
SpillableSubpartitionView.class) {
-                               // If there is a spilalble view, it's the 
responsibility of the
+                               // If there is a spillable view, it's the 
responsibility of the
                                // view to release memory.
                                SpillableSubpartitionView spillableView = 
(SpillableSubpartitionView) view;
                                return spillableView.releaseMemory();

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index df8de54..6781902 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -108,11 +108,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                                for (int i = 0; i < numBuffers; i++) {
                                        Buffer buffer = buffers.remove();
                                        spilledBytes += buffer.getSize();
-                                       try {
-                                               spillWriter.writeBlock(buffer);
-                                       } finally {
-                                               buffer.recycle();
-                                       }
+                                       spillWriter.writeBlock(buffer);
                                }
 
                                spilledView = new SpilledSubpartitionView(

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 40f3e32..4c25e0d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.util.concurrent.Callable;
@@ -39,7 +43,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link AsynchronousBufferFileWriter}.
+ */
 public class AsynchronousBufferFileWriterTest {
+       @Rule
+       public ExpectedException exception = ExpectedException.none();
 
        private static final IOManager ioManager = new IOManagerAsync();
 
@@ -67,6 +76,27 @@ public class AsynchronousBufferFileWriterTest {
        }
 
        @Test
+       public void testAddWithFailingWriter() throws Exception {
+               AsynchronousBufferFileWriter writer =
+                       new 
AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue<>());
+               writer.close();
+
+               exception.expect(IOException.class);
+
+               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               try {
+                       writer.writeBlock(buffer);
+               } finally {
+                       if (!buffer.isRecycled()) {
+                               buffer.recycle();
+                               Assert.fail("buffer not recycled");
+                       }
+                       assertEquals("Shouln't increment number of outstanding 
requests.", 0, writer.getNumberOfOutstandingRequests());
+               }
+       }
+
+       @Test
        public void testSubscribe() throws Exception {
                final TestNotificationListener listener = new 
TestNotificationListener();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
new file mode 100644
index 0000000..363e02b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+/**
+ * An {@link IOManagerAsync} that creates {@link BufferFileWriter} instances 
which do nothing in their {@link BufferFileWriter#writeBlock(Object)} method.
+ *
+ * <p>Beware: the passed {@link Buffer} instances must be cleaned up manually!
+ */
+public class IOManagerAsyncWithNoOpBufferFileWriter extends IOManagerAsync {
+       @Override
+       public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+                       throws IOException {
+               return new NoOpAsynchronousBufferFileWriter(channelID, 
getWriteRequestQueue(channelID));
+       }
+
+       /**
+        * {@link BufferFileWriter} subclass with a no-op in {@link 
#writeBlock(Buffer)}.
+        */
+       private static class NoOpAsynchronousBufferFileWriter extends 
AsynchronousBufferFileWriter {
+
+               private NoOpAsynchronousBufferFileWriter(
+                               ID channelID,
+                               RequestQueue<WriteRequest> requestQueue) throws 
IOException {
+                       super(channelID, requestQueue);
+               }
+
+               @Override
+               public void writeBlock(Buffer buffer) throws IOException {
+                       // do nothing
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 9fb7fd3..5d24b4a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -20,25 +20,41 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for {@link ResultPartition}.
+ */
 public class ResultPartitionTest {
 
+       /** Asynchronous I/O manager. */
+       private static final IOManager ioManager = new IOManagerAsync();
+
+       @AfterClass
+       public static void shutdown() {
+               ioManager.shutdown();
+       }
+
        /**
         * Tests the schedule or update consumers message sending behaviour 
depending on the relevant flags.
         */
@@ -49,7 +65,11 @@ public class ResultPartitionTest {
                        ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
                        ResultPartition partition = createPartition(notifier, 
ResultPartitionType.PIPELINED, true);
                        partition.writeBuffer(TestBufferFactory.createBuffer(), 
0);
-                       verify(notifier, 
times(1)).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
+                       verify(notifier, times(1))
+                               .notifyPartitionConsumable(
+                                       eq(partition.getJobId()),
+                                       eq(partition.getPartitionId()),
+                                       any(TaskActions.class));
                }
 
                {
@@ -180,6 +200,45 @@ public class ResultPartitionTest {
                assertTrue(buffer.isRecycled());
        }
 
+       @Test
+       public void testAddOnPipelinedPartition() throws Exception {
+               testAddOnPartition(ResultPartitionType.PIPELINED);
+       }
+
+       @Test
+       public void testAddOnBlockingPartition() throws Exception {
+               testAddOnPartition(ResultPartitionType.BLOCKING);
+       }
+
+       /**
+        * Tests {@link ResultPartition#writeBuffer(Buffer, int)} on a working 
partition.
+        *
+        * @param pipelined the result partition type to set up
+        */
+       protected void testAddOnPartition(final ResultPartitionType pipelined)
+               throws Exception {
+               ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+               ResultPartition partition = createPartition(notifier, 
pipelined, true);
+               Buffer buffer = TestBufferFactory.createBuffer();
+               try {
+                       // partition.add() adds the buffer without recycling it 
(if not spilling)
+                       partition.writeBuffer(buffer, 0);
+                       assertFalse("buffer should not be recycled (still in 
the queue)", buffer.isRecycled());
+               } finally {
+                       if (!buffer.isRecycled()) {
+                               buffer.recycle();
+                       }
+                       // should have been notified for pipelined partitions
+                       if (pipelined.isPipelined()) {
+                               verify(notifier, times(1))
+                                       .notifyPartitionConsumable(
+                                               eq(partition.getJobId()),
+                                               eq(partition.getPartitionId()),
+                                               any(TaskActions.class));
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        private static ResultPartition createPartition(
@@ -196,7 +255,7 @@ public class ResultPartitionTest {
                        1,
                        mock(ResultPartitionManager.class),
                        notifier,
-                       mock(IOManager.class),
+                       ioManager,
                        sendScheduleOrUpdateConsumersMessage);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 05a364d..c50b361 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -20,19 +20,26 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -56,12 +63,17 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for {@link SpillableSubpartition}.
+ */
 public class SpillableSubpartitionTest extends SubpartitionTestBase {
+       @Rule
+       public ExpectedException exception = ExpectedException.none();
 
-       /** Executor service for concurrent produce/consume tests */
-       private final static ExecutorService executorService = 
Executors.newCachedThreadPool();
+       /** Executor service for concurrent produce/consume tests. */
+       private static final ExecutorService executorService = 
Executors.newCachedThreadPool();
 
-       /** Asynchronous I/O manager */
+       /** Asynchronous I/O manager. */
        private static final IOManager ioManager = new IOManagerAsync();
 
        @AfterClass
@@ -72,6 +84,10 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
 
        @Override
        SpillableSubpartition createSubpartition() {
+               return createSubpartition(ioManager);
+       }
+
+       private static SpillableSubpartition createSubpartition(IOManager 
ioManager) {
                ResultPartition parent = mock(ResultPartition.class);
                BufferProvider bufferProvider = mock(BufferProvider.class);
                when(parent.getBufferProvider()).thenReturn(bufferProvider);
@@ -313,6 +329,218 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertTrue(buffer.isRecycled());
        }
 
+       /**
+        * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+        */
+       @Test
+       public void testAddOnFinishedSpillablePartition() throws Exception {
+               testAddOnFinishedPartition(false);
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+        */
+       @Test
+       public void testAddOnFinishedSpilledPartition() throws Exception {
+               testAddOnFinishedPartition(true);
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#add(Buffer)} with a finished 
partition.
+        *
+        * @param spilled
+        *              whether the partition should be spilled to disk 
(<tt>true</tt>) or not (<tt>false</tt>,
+        *              spillable).
+        */
+       private void testAddOnFinishedPartition(boolean spilled) throws 
Exception {
+               SpillableSubpartition partition = createSubpartition();
+               if (spilled) {
+                       assertEquals(0, partition.releaseMemory());
+               }
+               partition.finish();
+               // finish adds an EndOfPartitionEvent
+               assertEquals(1, partition.getTotalNumberOfBuffers());
+               assertEquals(4, partition.getTotalNumberOfBytes());
+
+               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               try {
+                       partition.add(buffer);
+               } finally {
+                       if (!buffer.isRecycled()) {
+                               buffer.recycle();
+                               Assert.fail("buffer not recycled");
+                       }
+                       // still same statistics
+                       assertEquals(1, partition.getTotalNumberOfBuffers());
+                       assertEquals(4, partition.getTotalNumberOfBytes());
+               }
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+        */
+       @Test
+       public void testAddOnReleasedSpillablePartition() throws Exception {
+               testAddOnReleasedPartition(false);
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
released partition.
+        */
+       @Test
+       public void testAddOnReleasedSpilledPartition() throws Exception {
+               testAddOnReleasedPartition(true);
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#add(Buffer)} with a released 
partition.
+        *
+        * @param spilled
+        *              whether the partition should be spilled to disk 
(<tt>true</tt>) or not (<tt>false</tt>,
+        *              spillable).
+        */
+       private void testAddOnReleasedPartition(boolean spilled) throws 
Exception {
+               SpillableSubpartition partition = createSubpartition();
+               partition.release();
+               if (spilled) {
+                       assertEquals(0, partition.releaseMemory());
+               }
+
+               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               try {
+                       partition.add(buffer);
+               } finally {
+                       if (!buffer.isRecycled()) {
+                               buffer.recycle();
+                               Assert.fail("buffer not recycled");
+                       }
+                       assertEquals(0, partition.getTotalNumberOfBuffers());
+                       assertEquals(0, partition.getTotalNumberOfBytes());
+               }
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
partition where adding the
+        * write request fails with an exception.
+        */
+       @Test
+       public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
+               // simulate slow writer by a no-op write operation
+               IOManager ioManager = new 
IOManagerAsyncWithNoOpBufferFileWriter();
+               SpillableSubpartition partition = createSubpartition(ioManager);
+               assertEquals(0, partition.releaseMemory());
+
+               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               try {
+                       partition.add(buffer);
+               } finally {
+                       ioManager.shutdown();
+                       if (buffer.isRecycled()) {
+                               Assert.fail("buffer recycled before the write 
operation completed");
+                       }
+                       buffer.recycle();
+                       assertEquals(1, partition.getTotalNumberOfBuffers());
+                       assertEquals(4096, partition.getTotalNumberOfBytes());
+               }
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable 
partition without a view
+        * but with a writer that does not do any write to check for correct 
buffer recycling.
+        */
+       @Test
+       public void testReleaseOnSpillablePartitionWithoutViewWithSlowWriter() 
throws Exception {
+               testReleaseOnSpillablePartitionWithSlowWriter(false);
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable 
partition which has a
+        * view associated with it and a writer that does not do any write to 
check for correct buffer
+        * recycling.
+        */
+       @Test
+       public void testReleaseOnSpillablePartitionWithViewWithSlowWriter() 
throws Exception {
+               testReleaseOnSpillablePartitionWithSlowWriter(true);
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable 
partition which has a a
+        * writer that does not do any write to check for correct buffer 
recycling.
+        */
+       private void testReleaseOnSpillablePartitionWithSlowWriter(boolean 
createView) throws Exception {
+               // simulate slow writer by a no-op write operation
+               IOManager ioManager = new 
IOManagerAsyncWithNoOpBufferFileWriter();
+               SpillableSubpartition partition = createSubpartition(ioManager);
+
+               Buffer buffer1 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer2 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               try {
+                       // we need two buffers because the view will use one of 
them and not release it
+                       partition.add(buffer1);
+                       partition.add(buffer2);
+                       assertFalse("buffer1 should not be recycled (still in 
the queue)", buffer1.isRecycled());
+                       assertFalse("buffer2 should not be recycled (still in 
the queue)", buffer2.isRecycled());
+                       assertEquals(2, partition.getTotalNumberOfBuffers());
+                       assertEquals(4096 * 2, 
partition.getTotalNumberOfBytes());
+
+                       if (createView) {
+                               // Create a read view
+                               partition.finish();
+                               partition.createReadView(numBuffers -> {});
+                       }
+
+                       // one instance of the buffers is placed in the view's 
nextBuffer and not released
+                       // (if there is no view, there will be no additional 
EndOfPartitionEvent)
+                       assertEquals(2, partition.releaseMemory());
+                       assertFalse("buffer1 should not be recycled (advertised 
as nextBuffer)", buffer1.isRecycled());
+                       assertFalse("buffer2 should not be recycled (not 
written yet)", buffer2.isRecycled());
+               } finally {
+                       ioManager.shutdown();
+                       if (!buffer1.isRecycled()) {
+                               buffer1.recycle();
+                       }
+                       if (!buffer2.isRecycled()) {
+                               buffer2.recycle();
+                       }
+                       // note: a view requires a finished partition which has 
an additional EndOfPartitionEvent
+                       assertEquals(2 + (createView ? 1 : 0), 
partition.getTotalNumberOfBuffers());
+                       assertEquals(4096 * 2 + (createView ? 4 : 0), 
partition.getTotalNumberOfBytes());
+               }
+       }
+
+       /**
+        * Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
partition where adding the
+        * write request fails with an exception.
+        */
+       @Test
+       public void testAddOnSpilledPartitionWithFailingWriter() throws 
Exception {
+               IOManager ioManager = new 
IOManagerAsyncWithClosedBufferFileWriter();
+               SpillableSubpartition partition = createSubpartition(ioManager);
+               assertEquals(0, partition.releaseMemory());
+
+               exception.expect(IOException.class);
+
+               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               try {
+                       partition.add(buffer);
+               } finally {
+                       ioManager.shutdown();
+
+                       if (!buffer.isRecycled()) {
+                               buffer.recycle();
+                               Assert.fail("buffer not recycled");
+                       }
+                       assertEquals(0, partition.getTotalNumberOfBuffers());
+                       assertEquals(0, partition.getTotalNumberOfBytes());
+               }
+       }
+
        private static class AwaitableBufferAvailablityListener implements 
BufferAvailabilityListener {
 
                private long numNotifiedBuffers;
@@ -333,4 +561,22 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                        }
                }
        }
+
+       /**
+        * An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+        * {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+        *
+        * <p>These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+        * write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+        */
+       private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+               @Override
+               public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+                               throws IOException {
+                       BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+                       bufferFileWriter.close();
+                       return bufferFileWriter;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/622daa44/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index fa62593..b748e1c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -44,6 +44,10 @@ import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for {@link SpillableSubpartitionView}, in addition to indirect tests 
via {@link
+ * SpillableSubpartitionTest}.
+ */
 public class SpilledSubpartitionViewTest {
 
        private static final IOManager IO_MANAGER = new IOManagerAsync();

Reply via email to