[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16597913#comment-16597913
 ] 

ASF GitHub Bot commented on FLINK-10142:
----------------------------------------

NicoK closed pull request #6555: [FLINK-10142][network] reduce locking around 
credit notification
URL: https://github.com/apache/flink/pull/6555
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 27d341aca40..9c9deaa2542 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -171,10 +171,7 @@ public void operationComplete(ChannelFuture future) throws 
Exception {
        }
 
        public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-               // We should skip the notification if the client is already 
closed.
-               if (!closeReferenceCounter.isDisposed()) {
-                       clientHandler.notifyCreditAvailable(inputChannel);
-               }
+               clientHandler.notifyCreditAvailable(inputChannel);
        }
 
        public void close(RemoteInputChannel inputChannel) throws IOException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 28f30209892..6738abd7f9c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -191,16 +191,16 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException, Int
                checkError();
 
                final Buffer next;
-               final int remaining;
+               final boolean moreAvailable;
 
                synchronized (receivedBuffers) {
                        next = receivedBuffers.poll();
-                       remaining = receivedBuffers.size();
+                       moreAvailable = !receivedBuffers.isEmpty();
                }
 
                numBytesIn.inc(next.getSizeUnsafe());
                numBuffersIn.inc();
-               return Optional.of(new BufferAndAvailability(next, remaining > 
0, getSenderBacklog()));
+               return Optional.of(new BufferAndAvailability(next, 
moreAvailable, getSenderBacklog()));
        }
 
        // 
------------------------------------------------------------------------
@@ -289,10 +289,7 @@ public String toString() {
        private void notifyCreditAvailable() {
                checkState(partitionRequestClient != null, "Tried to send task 
event to producer before requesting a queue.");
 
-               // We should skip the notification if this channel is already 
released.
-               if (!isReleased.get()) {
-                       partitionRequestClient.notifyCreditAvailable(this);
-               }
+               partitionRequestClient.notifyCreditAvailable(this);
        }
 
        /**
@@ -306,8 +303,8 @@ public void recycle(MemorySegment segment) {
                int numAddedBuffers;
 
                synchronized (bufferQueue) {
-                       // Important: check the isReleased state inside 
synchronized block, so there is no
-                       // race condition when recycle and releaseAllResources 
running in parallel.
+                       // Similar to notifyBufferAvailable(), make sure that 
we never add a buffer
+                       // after releaseAllResources() released all buffers 
(see below for details).
                        if (isReleased.get()) {
                                try {
                                        
inputGate.returnExclusiveSegments(Collections.singletonList(segment));
@@ -354,13 +351,6 @@ boolean isWaitingForFloatingBuffers() {
         */
        @Override
        public boolean notifyBufferAvailable(Buffer buffer) {
-               // Check the isReleased state outside synchronized block first 
to avoid
-               // deadlock with releaseAllResources running in parallel.
-               if (isReleased.get()) {
-                       buffer.recycleBuffer();
-                       return false;
-               }
-
                boolean recycleBuffer = true;
                try {
                        boolean needMoreBuffers = false;
@@ -368,8 +358,13 @@ public boolean notifyBufferAvailable(Buffer buffer) {
                                checkState(isWaitingForFloatingBuffers,
                                        "This channel should be waiting for 
floating buffers.");
 
-                               // Important: double check the isReleased state 
inside synchronized block, so there is no
-                               // race condition when notifyBufferAvailable 
and releaseAllResources running in parallel.
+                               // Important: make sure that we never add a 
buffer after releaseAllResources()
+                               // released all buffers. Following scenarios 
exist:
+                               // 1) releaseAllResources() already released 
buffers inside bufferQueue
+                               // -> then isReleased is set correctly
+                               // 2) releaseAllResources() did not yet release 
buffers from bufferQueue
+                               // -> we may or may not have set isReleased yet 
but will always wait for the
+                               //    lock on bufferQueue to release buffers
                                if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
                                        isWaitingForFloatingBuffers = false;
                                        recycleBuffer = false; // just in case
@@ -385,10 +380,10 @@ public boolean notifyBufferAvailable(Buffer buffer) {
                                } else {
                                        needMoreBuffers = true;
                                }
+                       }
 
-                               if (unannouncedCredit.getAndAdd(1) == 0) {
-                                       notifyCreditAvailable();
-                               }
+                       if (unannouncedCredit.getAndAdd(1) == 0) {
+                               notifyCreditAvailable();
                        }
 
                        return needMoreBuffers;
@@ -484,8 +479,8 @@ void onSenderBacklog(int backlog) throws IOException {
                int numRequestedBuffers = 0;
 
                synchronized (bufferQueue) {
-                       // Important: check the isReleased state inside 
synchronized block, so there is no
-                       // race condition when onSenderBacklog and 
releaseAllResources running in parallel.
+                       // Similar to notifyBufferAvailable(), make sure that 
we never add a buffer
+                       // after releaseAllResources() released all buffers 
(see above for details).
                        if (isReleased.get()) {
                                return;
                        }
@@ -510,33 +505,40 @@ void onSenderBacklog(int backlog) throws IOException {
        }
 
        public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) 
throws IOException {
-               boolean success = false;
+               boolean recycleBuffer = true;
 
                try {
+
+                       final boolean wasEmpty;
                        synchronized (receivedBuffers) {
-                               if (!isReleased.get()) {
-                                       if (expectedSequenceNumber == 
sequenceNumber) {
-                                               int available = 
receivedBuffers.size();
+                               // Similar to notifyBufferAvailable(), make 
sure that we never add a buffer
+                               // after releaseAllResources() released all 
buffers from receivedBuffers
+                               // (see above for details).
+                               if (isReleased.get()) {
+                                       return;
+                               }
 
-                                               receivedBuffers.add(buffer);
-                                               expectedSequenceNumber++;
+                               if (expectedSequenceNumber != sequenceNumber) {
+                                       onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
+                                       return;
+                               }
 
-                                               if (available == 0) {
-                                                       notifyChannelNonEmpty();
-                                               }
+                               wasEmpty = receivedBuffers.isEmpty();
+                               receivedBuffers.add(buffer);
+                               recycleBuffer = false;
+                       }
 
-                                               success = true;
-                                       } else {
-                                               onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
-                                       }
-                               }
+                       ++expectedSequenceNumber;
+
+                       if (wasEmpty) {
+                               notifyChannelNonEmpty();
                        }
 
-                       if (success && backlog >= 0) {
+                       if (backlog >= 0) {
                                onSenderBacklog(backlog);
                        }
                } finally {
-                       if (!success) {
+                       if (recycleBuffer) {
                                buffer.recycleBuffer();
                        }
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 2e7d076f3f8..f51dc7417ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -545,7 +545,7 @@ public void requestPartitions() throws IOException, 
InterruptedException {
 
                                currentChannel = inputChannelsWithData.remove();
                                
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
-                               moreAvailable = inputChannelsWithData.size() > 
0;
+                               moreAvailable = 
!inputChannelsWithData.isEmpty();
                        }
 
                        result = currentChannel.getNextBuffer();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 63054923498..9141b36d445 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -106,10 +106,33 @@ public void testExceptionOnReordering() throws Exception {
 
        @Test
        public void testConcurrentOnBufferAndRelease() throws Exception {
-               // Config
-               // Repeatedly spawn two tasks: one to queue buffers and the 
other to release the channel
-               // concurrently. We do this repeatedly to provoke races.
-               final int numberOfRepetitions = 8192;
+               testConcurrentReleaseAndSomething(8192, (inputChannel, buffer, 
j) -> {
+                       inputChannel.onBuffer(buffer, j, -1);
+                       return null;
+               });
+       }
+
+       @Test
+       public void testConcurrentNotifyBufferAvailableAndRelease() throws 
Exception {
+               testConcurrentReleaseAndSomething(1024, (inputChannel, buffer, 
j) ->
+                       inputChannel.notifyBufferAvailable(buffer)
+               );
+       }
+
+       private interface TriFunction<T, U, V, R> {
+               R apply(T t, U u, V v) throws Exception;
+       }
+
+       /**
+        * Repeatedly spawns two tasks: one to call <tt>function</tt> and the 
other to release the
+        * channel concurrently. We do this repeatedly to provoke races.
+        *
+        * @param numberOfRepetitions how often to repeat the test
+        * @param function function to call concurrently to {@link 
RemoteInputChannel#releaseAllResources()}
+        */
+       private void testConcurrentReleaseAndSomething(
+                       final int numberOfRepetitions,
+                       TriFunction<RemoteInputChannel, Buffer, Integer, 
Object> function) throws Exception {
 
                // Setup
                final ExecutorService executor = 
Executors.newFixedThreadPool(2);
@@ -122,30 +145,23 @@ public void testConcurrentOnBufferAndRelease() throws 
Exception {
                        for (int i = 0; i < numberOfRepetitions; i++) {
                                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
 
-                               final Callable<Void> enqueueTask = new 
Callable<Void>() {
-                                       @Override
-                                       public Void call() throws Exception {
-                                               while (true) {
-                                                       for (int j = 0; j < 
128; j++) {
-                                                               // this is the 
same buffer over and over again which will be
-                                                               // recycled by 
the RemoteInputChannel
-                                                               
inputChannel.onBuffer(buffer.retainBuffer(), j, -1);
-                                                       }
+                               final Callable<Void> enqueueTask = () -> {
+                                       while (true) {
+                                               for (int j = 0; j < 128; j++) {
+                                                       // this is the same 
buffer over and over again which will be
+                                                       // recycled by the 
RemoteInputChannel
+                                                       
function.apply(inputChannel, buffer.retainBuffer(), j);
+                                               }
 
-                                                       if 
(inputChannel.isReleased()) {
-                                                               return null;
-                                                       }
+                                               if (inputChannel.isReleased()) {
+                                                       return null;
                                                }
                                        }
                                };
 
-                               final Callable<Void> releaseTask = new 
Callable<Void>() {
-                                       @Override
-                                       public Void call() throws Exception {
-                                               
inputChannel.releaseAllResources();
-
-                                               return null;
-                                       }
+                               final Callable<Void> releaseTask = () -> {
+                                       inputChannel.releaseAllResources();
+                                       return null;
                                };
 
                                // Submit tasks and wait to finish
@@ -158,8 +174,8 @@ public Void call() throws Exception {
                                        result.get();
                                }
 
-                               assertEquals("Resource leak during concurrent 
release and enqueue.",
-                                               0, 
inputChannel.getNumberOfQueuedBuffers());
+                               assertEquals("Resource leak during concurrent 
release and notifyBufferAvailable.",
+                                       0, 
inputChannel.getNumberOfQueuedBuffers());
                        }
                }
                finally {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> --------------------------------------------------------
>
>                 Key: FLINK-10142
>                 URL: https://issues.apache.org/jira/browse/FLINK-10142
>             Project: Flink
>          Issue Type: Bug
>          Components: Network
>    Affects Versions: 1.5.2, 1.6.0, 1.7.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Major
>              Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to