[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-14 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r425079339



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -94,13 +99,22 @@ public void requestSubpartitionView(
}
 
@Override
-   public void addCredit(int creditDeltas) {
+   public void addCredit(int creditDeltas) throws Exception {
numCreditsAvailable += creditDeltas;
+   requestQueue.enqueueAvailableReader(this, this::isAvailable);
+   }
+
+   @Override
+   public boolean shouldAnnounceBacklog() {
+   return !withoutExclusiveCredits && numCreditsAvailable == 0 && 
subpartitionView.isAvailable(Integer.MAX_VALUE);
}
 
@Override
-   public void resumeConsumption() {
+   public void resumeConsumption(int availableCredit, boolean 
hasUnfulfilledBacklog) throws Exception {

Review comment:
   nit: availableCredit -> availableCredits





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-14 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r425078707



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -69,6 +73,7 @@
this.receiverId = receiverId;
this.numCreditsAvailable = initialCredit;
this.requestQueue = requestQueue;
+   this.withoutExclusiveCredits = initialCredit > 0;

Review comment:
   initialCredit  == 0;

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -69,6 +73,7 @@
this.receiverId = receiverId;
this.numCreditsAvailable = initialCredit;
this.requestQueue = requestQueue;
+   this.withoutExclusiveCredits = initialCredit > 0;

Review comment:
   initialCredit  == 0





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-14 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r425078522



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -163,8 +172,15 @@ boolean hasBuffersAvailable() {
return subpartitionView.isAvailable(Integer.MAX_VALUE);
}
 
-   @Override
-   public BufferAndAvailability getNextBuffer() throws IOException {
+   private AddBacklogMessage getAddBacklogMessage() {
+   int backlog = subpartitionView.getAndResetUnannouncedBacklog();
+   if (backlog > 0) {
+   return new AddBacklogMessage(receiverId, backlog, 
false);
+   }
+   return null;

Review comment:
   I guess we should not expect `null` return here. As long as the code 
path enters `getAddBacklogMessage`, then we should guarantee that the 
respective backlog should be more than 0. 
   Maybe add assert backlog instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-14 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r425072084



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##
@@ -328,4 +325,67 @@ public void operationComplete(ChannelFuture future) throws 
Exception {
}
}
}
+
+   /**
+* Outbound message to be sent to the client.
+*/
+   public static abstract class ServerOutboundMessage {
+   protected final InputChannelID receiverId;
+   protected final int backlog;
+   private final boolean moreAvailable;
+
+   ServerOutboundMessage(InputChannelID receiverId, int backlog, 
boolean moreAvailable) {
+   checkArgument(backlog >= 0, "Number of backlog must be 
non-negative.");
+   this.receiverId = checkNotNull(receiverId);
+   this.backlog = backlog;
+   this.moreAvailable = moreAvailable;
+   }
+
+   abstract Object build();
+
+   public boolean isMoreAvailable() {
+   return moreAvailable;
+   }
+
+   void recycleBufferIfNeeded() {
+   }
+   }
+
+   static class BufferResponseMessage extends ServerOutboundMessage {
+   private final Buffer buffer;
+   private final int sequenceNumber;
+
+   BufferResponseMessage(
+   Buffer buffer,
+   InputChannelID receiverId,
+   int sequenceNumber,
+   int backlog,
+   boolean moreAvailable) {
+   super(receiverId, backlog, moreAvailable);
+   this.buffer = checkNotNull(buffer);
+   this.sequenceNumber = sequenceNumber;
+   }
+
+   @Override
+   Object build() {
+   return new BufferResponse(buffer, sequenceNumber, 
receiverId, backlog);
+   }
+
+   @Override
+   void recycleBufferIfNeeded() {
+   buffer.recycleBuffer();
+   }
+   }
+
+   static class AddBacklogMessage extends ServerOutboundMessage {
+
+   AddBacklogMessage(InputChannelID receiverId, int backlog, 
boolean moreAvailable) {

Review comment:
   remove `moreAvailable` argument from the constructor, because it seems 
strange for `AddBacklogMessage`, then we can give false in below `super` 
instead.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##
@@ -94,10 +97,11 @@ void notifyReaderNonEmpty(final NetworkSequenceViewReader 
reader) {
 * NOTE: Only one thread would trigger the actual enqueue after 
checking the reader's
 * availability, so there is no race condition here.
 */
-   private void enqueueAvailableReader(final NetworkSequenceViewReader 
reader) throws Exception {
-   if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
+   void enqueueAvailableReader(final NetworkSequenceViewReader reader, 
BooleanSupplier condition) throws Exception {

Review comment:
   Adjust the javadoc accordingly





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-14 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r424911901



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##
@@ -129,7 +132,6 @@ private boolean shouldContinueRequest(BufferPool 
bufferPool) {
 */
void requestExclusiveBuffers() throws IOException {
Collection segments = 
globalPool.requestMemorySegments();
-   checkArgument(!segments.isEmpty(), "The number of exclusive 
buffers per channel should be larger than 0.");

Review comment:
   Got it. So do you think we add the `if (initialCredit > 0)` before 
calling this method inside `RemoteInputChannel#assignExclusiveSegments`?  Just 
for not necessary to `synchronized` below for empty segments.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-14 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r424906972



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##
@@ -313,7 +313,7 @@ int getNumberOfAvailableBuffers() {
}
}
 
-   int unsynchronizedGetExclusiveBuffersUsed() {
+   int unsynchronizedGetExclusiveBuffers() {

Review comment:
   unsynchronizedGetAvailableExclusiveBuffers?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-14 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r424902024



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -418,8 +437,8 @@ public Buffer requestBuffer() {
 *
 * @param backlog The number of unsent buffers in the producer's sub 
partition.
 */
-   void onSenderBacklog(int backlog) throws IOException {
-   
notifyBufferAvailable(bufferManager.requestFloatingBuffers(backlog + 
initialCredit));
+   public void onSenderBacklog(int backlog) throws IOException {

Review comment:
   nit: also adjust the javadoc of this method accordingly.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -327,6 +322,14 @@ public void notifyBufferAvailable(int numAvailableBuffers) 
{
}
}
 
+   @Override
+   public void onCheckpointBarrier(CheckpointBarrier barrier) {

Review comment:
   make it only private method inside `RemoteInputChannel`, because it is 
never used outside.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-13 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r424866076



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##
@@ -349,7 +373,7 @@ int unsynchronizedGetFloatingBuffersAvailable() {
 */
int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
-   if (getAvailableBufferSize() > numRequiredBuffers) {
+   if (numRequiredBuffers == 0) {

Review comment:
   It should not be changed here. If the `numRequiredBuffers` is 0, 
`getAvailableBufferSize()` must be more than it. If `numRequiredBuffers` is `1` 
or something else, as long as the `getAvailableBufferSize()` is more than it, 
we also need to release a floating buffer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-13 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r424863522



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##
@@ -151,23 +149,42 @@ int requestFloatingBuffers(int numRequired) throws 
IOException {
return numRequestedBuffers;
}
 
-   numRequiredBuffers = numRequired;
+   numRequiredBuffers += numRequired;
+   numRequestedBuffers = 
internalRequestFloatingBuffers(numRequiredBuffers);
+   numRequiredBuffers -= numRequestedBuffers;
+   }
+   return numRequestedBuffers;
+   }
 
-   while (bufferQueue.getAvailableBufferSize() < 
numRequiredBuffers && !isWaitingForFloatingBuffers) {
-   BufferPool bufferPool = 
inputChannel.inputGate.getBufferPool();
-   Buffer buffer = bufferPool.requestBuffer();
-   if (buffer != null) {
-   bufferQueue.addFloatingBuffer(buffer);
-   numRequestedBuffers++;
-   } else if (bufferPool.addBufferListener(this)) {
-   isWaitingForFloatingBuffers = true;
-   break;
-   }
+   private int internalRequestFloatingBuffers(int numBuffersToRequest) 
throws IOException {
+   assert Thread.holdsLock(bufferQueue);
+
+   int numRequestedBuffers = 0;
+   while (numRequestedBuffers < numBuffersToRequest && 
!isWaitingForFloatingBuffers) {
+   BufferPool bufferPool = 
inputChannel.inputGate.getBufferPool();
+   Buffer buffer = bufferPool.requestBuffer();
+   if (buffer != null) {
+   bufferQueue.addFloatingBuffer(buffer);
+   numRequestedBuffers++;
+   } else if (bufferPool.addBufferListener(this)) {
+   isWaitingForFloatingBuffers = true;
+   break;
}
}
return numRequestedBuffers;
}
 
+   public void unregisterBufferListenerAndReleaseFloatingBuffers() {

Review comment:
   This method should be placed into the below section `Buffer recycle`. 
   
   I think it is better to integrate this method with existing 
`#releaseFloatingBuffers` to provide a general one, otherwise it might bring 
confusing to understand the difference among them, especially for the different 
handle of `numRequiredBuffers`, to make them seem customized logic.
   
   The integration is as below
   
   ```
   void releaseFloatingBuffers(boolean isTemporaryRelease) {
synchronized (bufferQueue) {
if (isWaitingForFloatingBuffers) {

inputChannel.inputGate.getBufferPool().removeBufferListener(this);
isWaitingForFloatingBuffers = false;
}
   
int numReleasedBuffers = 
bufferQueue.releaseFloatingBuffers();
if (isTemporaryRelease) {
numRequiredBuffers += numReleasedBuffers;
} else {
numRequiredBuffers = 0;
}
}
}
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-13 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r424860940



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##
@@ -151,23 +150,43 @@ int requestFloatingBuffers(int numRequired) throws 
IOException {
return numRequestedBuffers;
}
 
-   numRequiredBuffers = numRequired;
+   numRequiredBuffers += numRequired;
+   numRequestedBuffers = 
internalRequestFloatingBuffers(numRequiredBuffers);
+   numRequiredBuffers -= numRequestedBuffers;
+   }
+   return numRequestedBuffers;
+   }
 
-   while (bufferQueue.getAvailableBufferSize() < 
numRequiredBuffers && !isWaitingForFloatingBuffers) {
-   BufferPool bufferPool = 
inputChannel.inputGate.getBufferPool();
-   Buffer buffer = bufferPool.requestBuffer();
-   if (buffer != null) {
-   bufferQueue.addFloatingBuffer(buffer);
-   numRequestedBuffers++;
-   } else if (bufferPool.addBufferListener(this)) {
-   isWaitingForFloatingBuffers = true;
-   break;
-   }
+   private int internalRequestFloatingBuffers(int numBuffersToRequest) 
throws IOException {
+   assert Thread.holdsLock(bufferQueue);
+
+   int numRequestedBuffers = 0;
+   while (numRequestedBuffers < numBuffersToRequest && 
!isWaitingForFloatingBuffers) {
+   BufferPool bufferPool = 
inputChannel.inputGate.getBufferPool();
+   Buffer buffer = bufferPool.requestBuffer();
+   if (buffer != null) {
+   bufferQueue.addFloatingBuffer(buffer);
+   numRequestedBuffers++;
+   } else if (bufferPool.addBufferListener(this)) {
+   isWaitingForFloatingBuffers = true;
+   break;
}
}
return numRequestedBuffers;
}
 
+   public void unregisterBufferListenerAndReleaseFloatingBuffers() {
+   synchronized (bufferQueue) {
+   if (isWaitingForFloatingBuffers) {
+   
inputChannel.inputGate.getBufferPool().removeBufferListener(this);
+   isWaitingForFloatingBuffers = false;
+   }
+
+   int bufferReleased = 
bufferQueue.releaseFloatingBuffers();

Review comment:
   nit: numReleasedBuffers





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-13 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r424860940



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java
##
@@ -151,23 +150,43 @@ int requestFloatingBuffers(int numRequired) throws 
IOException {
return numRequestedBuffers;
}
 
-   numRequiredBuffers = numRequired;
+   numRequiredBuffers += numRequired;
+   numRequestedBuffers = 
internalRequestFloatingBuffers(numRequiredBuffers);
+   numRequiredBuffers -= numRequestedBuffers;
+   }
+   return numRequestedBuffers;
+   }
 
-   while (bufferQueue.getAvailableBufferSize() < 
numRequiredBuffers && !isWaitingForFloatingBuffers) {
-   BufferPool bufferPool = 
inputChannel.inputGate.getBufferPool();
-   Buffer buffer = bufferPool.requestBuffer();
-   if (buffer != null) {
-   bufferQueue.addFloatingBuffer(buffer);
-   numRequestedBuffers++;
-   } else if (bufferPool.addBufferListener(this)) {
-   isWaitingForFloatingBuffers = true;
-   break;
-   }
+   private int internalRequestFloatingBuffers(int numBuffersToRequest) 
throws IOException {
+   assert Thread.holdsLock(bufferQueue);
+
+   int numRequestedBuffers = 0;
+   while (numRequestedBuffers < numBuffersToRequest && 
!isWaitingForFloatingBuffers) {
+   BufferPool bufferPool = 
inputChannel.inputGate.getBufferPool();
+   Buffer buffer = bufferPool.requestBuffer();
+   if (buffer != null) {
+   bufferQueue.addFloatingBuffer(buffer);
+   numRequestedBuffers++;
+   } else if (bufferPool.addBufferListener(this)) {
+   isWaitingForFloatingBuffers = true;
+   break;
}
}
return numRequestedBuffers;
}
 
+   public void unregisterBufferListenerAndReleaseFloatingBuffers() {
+   synchronized (bufferQueue) {
+   if (isWaitingForFloatingBuffers) {
+   
inputChannel.inputGate.getBufferPool().removeBufferListener(this);
+   isWaitingForFloatingBuffers = false;
+   }
+
+   int bufferReleased = 
bufferQueue.releaseFloatingBuffers();

Review comment:
   nit: bufferReleased  -> numReleasedBuffers





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422972620



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -428,6 +423,23 @@ public void notifyBufferDestroyed() {
// Nothing to do actually.
}
 
+   @Override
+   public void onCheckpointBarrier(CheckpointBarrier barrier) {

Review comment:
   Considering the race condition between this operation and canceler task, 
it might involve in potential conflicts and deadlock case if 
`removeBufferListener` and adjust `isWaitingForFloatingBuffers` here. Let me 
think whether we can lazy remove the listener and adjust 
`isWaitingForFloatingBuffers` by reusing the existing process 
`RemoteInputChannel#notifyBufferAvailable`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422970729



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -428,6 +423,23 @@ public void notifyBufferDestroyed() {
// Nothing to do actually.
}
 
+   @Override
+   public void onCheckpointBarrier(CheckpointBarrier barrier) {

Review comment:
   It might be better to trigger this action by netty thread when received 
this barrier immediately. Otherwise the task processing might delay much time 
to better reuse the floating buffers for other channels.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422950710



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -99,7 +100,7 @@
/** The number of available buffers that have not been announced to the 
producer yet. */
private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
-   /** The number of required buffers that equals to sender's backlog plus 
initial credit. */
+   /** The number of buffers to requested that equals to unfulfilled 
sender's backlog. */
@GuardedBy("bufferQueue")
private int numRequiredBuffers;

Review comment:
   nit: might rename to `numRequiredFloatingBuffers` for better reflecting 
the current semantic.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422889022



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -94,13 +99,27 @@ public void requestSubpartitionView(
}
 
@Override
-   public void addCredit(int creditDeltas) {
+   public boolean addCredit(int creditDeltas) {
numCreditsAvailable += creditDeltas;
+   return shouldAnnounceBacklog();
+   }
+
+   @Override
+   public boolean shouldAnnounceBacklog() {
+   return initialCredit == 0 && numCreditsAvailable == 0 && 
subpartitionView.isAvailable(Integer.MAX_VALUE);
}
 
@Override
-   public void resumeConsumption() {
+   public boolean resumeConsumption(int availableCredit, int 
unfulfilledBacklog) {
+   if (initialCredit > 0) {
+   checkState(numCreditsAvailable == availableCredit, 
"Illegal number of available credit.");

Review comment:
   I guess this check is not necessary or invalid.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422884550



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
##
@@ -305,19 +311,18 @@ protected CheckpointBarrier 
parseCheckpointBarrierOrNull(Buffer buffer) throws I
 
/**
 * A combination of a {@link Buffer} and a flag indicating availability 
of further buffers,
-* and the backlog length indicating how many non-event buffers are 
available in the
-* subpartition.
+* and the backlog length indicating how many credits the subpartition.
 */
public static final class BufferAndAvailability {
 
private final Buffer buffer;
private final boolean moreAvailable;
-   private final int buffersInBacklog;
+   private final int backlog;
 
-   public BufferAndAvailability(Buffer buffer, boolean 
moreAvailable, int buffersInBacklog) {
-   this.buffer = checkNotNull(buffer);
+   public BufferAndAvailability(@Nullable Buffer buffer, boolean 
moreAvailable, int backlog) {

Review comment:
   We can also avoid introducing `nullable` buffer by this comment 
https://github.com/apache/flink/pull/11877/files#r422847423





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422868983



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##
@@ -94,13 +99,27 @@ public void requestSubpartitionView(
}
 
@Override
-   public void addCredit(int creditDeltas) {
+   public boolean addCredit(int creditDeltas) {
numCreditsAvailable += creditDeltas;
+   return shouldAnnounceBacklog();

Review comment:
   why we need to trigger announce backlog while adding credit?
   I assume since the added `creditDeltas` is always more than zero, then we 
have the chance to announce the backlog later via sending `BufferResponse`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422864980



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
##
@@ -158,11 +158,18 @@ public boolean add(BufferConsumer bufferConsumer) throws 
IOException {
private final Buffer buffer;
private final boolean isDataAvailable;
private final int buffersInBacklog;
+   private final int unannouncedBacklog;

Review comment:
   `buffersInBacklog` and `unannouncedBacklog` should be retained only one 
finally.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422863695



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##
@@ -70,6 +70,10 @@
@GuardedBy("buffers")
private int buffersInBacklog;
 
+   /** The number of non-event buffers to be announced to the downstream. 
*/
+   @GuardedBy("buffers")
+   private int unannouncedBacklog;

Review comment:
   The previous `buffersInBacklog` variable should be replaced by this new 
variable, to avoid maintaining two variables.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-11 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422847423



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##
@@ -234,11 +236,16 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
registerAvailableReader(reader);
}
 
-   BufferResponse msg = new BufferResponse(
-   next.buffer(),
-   reader.getSequenceNumber(),
-   reader.getReceiverId(),
-   next.buffersInBacklog());
+   Object msg;
+   if (next.buffer() != null) {

Review comment:
   The different message path is distinguished in both 
`PartitionRequestQueue` and `CreditBasedSequenceNumberingViewReader` now. We 
can improve it to judge only in one place instead.
   
   1. Introduce `ServerOutboundMessage` class to extend `NettyMessage` and make 
`AddBacklog` and `BufferResponse` both extend `ServerOutboundMessage`.
   2. Introduce `NetworkSequenceViewReader#getNextMessage` instead of existing 
`NetworkSequenceViewReader#getNextBuffer`. And inside 
`CreditBasedSequenceNumberingViewReader` implementation, we can judge the 
condition for distinguish.
   
   ```
   public NettyMessage.ServerOutboundMessage getNextMessage() throws 
IOException {
if (numCreditsAvailable == 0 && initialCredit == 0 && 
!subpartitionView.isAvailable(numCreditsAvailable)) {
return getBacklogMessage();
} else {
return getNextBufferResponse();
}
}
   ```
   
   To do so we can also reduce the necessary transformation between 
`BufferAndAvailability` and `BufferResponse`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-09 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422466188



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -428,6 +423,23 @@ public void notifyBufferDestroyed() {
// Nothing to do actually.
}
 
+   @Override
+   public void onCheckpointBarrier(CheckpointBarrier barrier) {

Review comment:
   Make the following condition out of `synchronized` part, then we do not 
need to touch the lock for most of the cases.
   ```
   if (!(initialCredit == 0 && options.isExactlyOnceMode() && 
!options.isUnalignedCheckpoint()))  {
return;
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers

2020-05-09 Thread GitBox


zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422466188



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -428,6 +423,23 @@ public void notifyBufferDestroyed() {
// Nothing to do actually.
}
 
+   @Override
+   public void onCheckpointBarrier(CheckpointBarrier barrier) {

Review comment:
   Make the following condition out of `synchronized` part, then we do not 
need to touch the lock for most of the cases.
   ```
   CheckpointOptions options = barrier.getCheckpointOptions();
   if (!(initialCredit == 0 && options.isExactlyOnceMode() && 
!options.isUnalignedCheckpoint()))  {
return;
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org