Repository: flink
Updated Branches:
  refs/heads/master a803dc7e7 -> 8706c6f44


[FLINK-7394][core] Manage exclusive buffers in RemoteInputChannel

This closes #4499.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3cbba5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3cbba5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3cbba5a

Branch: refs/heads/master
Commit: d3cbba5ab4be498526ae028c2b3a5d5b8dfe4bbd
Parents: 064a1e6
Author: Zhijiang <[email protected]>
Authored: Mon Aug 14 14:30:47 2017 +0800
Committer: zentol <[email protected]>
Committed: Tue Oct 10 16:53:19 2017 +0200

----------------------------------------------------------------------
 .../partition/consumer/RemoteInputChannel.java  | 107 ++++++++++++++++++-
 .../consumer/RemoteInputChannelTest.java        |  79 ++++++++++++++
 2 files changed, 182 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3cbba5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 58c9484..ee6bfda 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -24,23 +24,29 @@ import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An input channel, which requests a remote partition queue.
  */
-public class RemoteInputChannel extends InputChannel {
+public class RemoteInputChannel extends InputChannel implements BufferRecycler 
{
 
        /** ID to distinguish this channel from other channels sharing the same 
TCP connection. */
        private final InputChannelID id = new InputChannelID();
@@ -72,6 +78,15 @@ public class RemoteInputChannel extends InputChannel {
         */
        private int expectedSequenceNumber = 0;
 
+       /** The initial number of exclusive buffers assigned to this channel. */
+       private int initialCredit;
+
+       /** The current available buffers including both exclusive buffers and 
requested floating buffers. */
+       private final ArrayDeque<Buffer> availableBuffers = new ArrayDeque<>();
+
+       /** The number of available buffers that have not been announced to the 
producer yet. */
+       private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
+
        public RemoteInputChannel(
                SingleInputGate inputGate,
                int channelIndex,
@@ -99,8 +114,24 @@ public class RemoteInputChannel extends InputChannel {
                this.connectionManager = checkNotNull(connectionManager);
        }
 
+       /**
+        * Assigns exclusive buffers to this input channel, and this method 
should be called only once
+        * after this input channel is created.
+        */
        void assignExclusiveSegments(List<MemorySegment> segments) {
-               // TODO in next PR
+               checkState(this.initialCredit == 0, "Bug in input channel setup 
logic: exclusive buffers have " +
+                       "already been set for this input channel.");
+
+               checkNotNull(segments);
+               checkArgument(segments.size() > 0, "The number of exclusive 
buffers per channel should be larger than 0.");
+
+               this.initialCredit = segments.size();
+
+               synchronized(availableBuffers) {
+                       for (MemorySegment segment : segments) {
+                               availableBuffers.add(new Buffer(segment, this));
+                       }
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -183,18 +214,41 @@ public class RemoteInputChannel extends InputChannel {
        }
 
        /**
-        * Releases all received buffers and closes the partition request 
client.
+        * Releases all exclusive and floating buffers, closes the partition 
request client.
         */
        @Override
        void releaseAllResources() throws IOException {
                if (isReleased.compareAndSet(false, true)) {
+
+                       // Gather all exclusive buffers and recycle them to 
global pool in batch
+                       final List<MemorySegment> exclusiveRecyclingSegments = 
new ArrayList<>();
+
                        synchronized (receivedBuffers) {
                                Buffer buffer;
                                while ((buffer = receivedBuffers.poll()) != 
null) {
-                                       buffer.recycle();
+                                       if (buffer.getRecycler() == this) {
+                                               
exclusiveRecyclingSegments.add(buffer.getMemorySegment());
+                                       } else {
+                                               buffer.recycle();
+                                       }
+                               }
+                       }
+
+                       synchronized (availableBuffers) {
+                               Buffer buffer;
+                               while ((buffer = availableBuffers.poll()) != 
null) {
+                                       if (buffer.getRecycler() == this) {
+                                               
exclusiveRecyclingSegments.add(buffer.getMemorySegment());
+                                       } else {
+                                               buffer.recycle();
+                                       }
                                }
                        }
 
+                       if (exclusiveRecyclingSegments.size() > 0) {
+                               
inputGate.returnExclusiveSegments(exclusiveRecyclingSegments);
+                       }
+
                        // The released flag has to be set before closing the 
connection to ensure that
                        // buffers received concurrently with closing are 
properly recycled.
                        if (partitionRequestClient != null) {
@@ -215,6 +269,51 @@ public class RemoteInputChannel extends InputChannel {
        }
 
        // 
------------------------------------------------------------------------
+       // Credit-based
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
+        */
+       void notifyCreditAvailable() {
+               //TODO in next PR
+       }
+
+       /**
+        * Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
+        * credit to producer.
+        *
+        * @param segment The exclusive segment of this channel.
+        */
+       @Override
+       public void recycle(MemorySegment segment) {
+               synchronized (availableBuffers) {
+                       // Important: the isReleased check should be inside the 
synchronized block.
+                       // that way the segment can also be returned to global 
pool after added into
+                       // the available queue during releasing all resources.
+                       if (isReleased.get()) {
+                               try {
+                                       
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+                                       return;
+                               } catch (Throwable t) {
+                                       ExceptionUtils.rethrow(t);
+                               }
+                       }
+                       availableBuffers.add(new Buffer(segment, this));
+               }
+
+               if (unannouncedCredit.getAndAdd(1) == 0) {
+                       notifyCreditAvailable();
+               }
+       }
+
+       public int getNumberOfAvailableBuffers() {
+               synchronized (availableBuffers) {
+                       return availableBuffers.size();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
        // Network I/O notifications (called by network I/O thread)
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d3cbba5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 08bf5eb..bced9ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -34,6 +36,7 @@ import org.junit.Test;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -45,8 +48,10 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -295,6 +300,80 @@ public class RemoteInputChannelTest {
                ch.getNextBuffer();
        }
 
+       /**
+        * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
+        * recycled to available buffers directly and it triggers notify of 
announced credit.
+        */
+       @Test
+       public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
+               final SingleInputGate inputGate = mock(SingleInputGate.class);
+               final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+               // Recycle exclusive segment
+               
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+
+               assertEquals("There should be one buffer available after 
recycle.",
+                       1, inputChannel.getNumberOfAvailableBuffers());
+               verify(inputChannel, times(1)).notifyCreditAvailable();
+
+               
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+
+               assertEquals("There should be two buffers available after 
recycle.",
+                       2, inputChannel.getNumberOfAvailableBuffers());
+               // It should be called only once when increased from zero.
+               verify(inputChannel, times(1)).notifyCreditAvailable();
+       }
+
+       /**
+        * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
+        * recycled to global pool via input gate when channel is released.
+        */
+       @Test
+       public void testRecycleExclusiveBufferAfterReleased() throws Exception {
+               // Setup
+               final SingleInputGate inputGate = mock(SingleInputGate.class);
+               final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+               inputChannel.releaseAllResources();
+
+               // Recycle exclusive segment after channel released
+               
inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+
+               assertEquals("Resource leak during recycling buffer after 
channel is released.",
+                       0, inputChannel.getNumberOfAvailableBuffers());
+               verify(inputChannel, times(0)).notifyCreditAvailable();
+               verify(inputGate, 
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
+       }
+
+       /**
+        * Tests {@link RemoteInputChannel#releaseAllResources()}, verifying 
the exclusive segments are
+        * recycled to global pool via input gate and no resource leak.
+        */
+       @Test
+       public void testReleaseExclusiveBuffers() throws Exception {
+               // Setup
+               final SingleInputGate inputGate = mock(SingleInputGate.class);
+               final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+
+               // Assign exclusive segments to channel
+               final List<MemorySegment> exclusiveSegments = new ArrayList<>();
+               final int numExclusiveBuffers = 2;
+               for (int i = 0; i < numExclusiveBuffers; i++) {
+                       
exclusiveSegments.add(MemorySegmentFactory.allocateUnpooledSegment(1024, 
inputChannel));
+               }
+               inputChannel.assignExclusiveSegments(exclusiveSegments);
+
+               assertEquals("The number of available buffers is not equal to 
the assigned amount.",
+                       numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+
+               // Release this channel
+               inputChannel.releaseAllResources();
+
+               assertEquals("Resource leak after channel is released.",
+                       0, inputChannel.getNumberOfAvailableBuffers());
+               verify(inputGate, 
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
+       }
+
        // 
---------------------------------------------------------------------------------------------
 
        private RemoteInputChannel createRemoteInputChannel(SingleInputGate 
inputGate)

Reply via email to