[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r425079339 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -94,13 +99,22 @@ public void requestSubpartitionView( } @Override - public void addCredit(int creditDeltas) { + public void addCredit(int creditDeltas) throws Exception { numCreditsAvailable += creditDeltas; + requestQueue.enqueueAvailableReader(this, this::isAvailable); + } + + @Override + public boolean shouldAnnounceBacklog() { + return !withoutExclusiveCredits && numCreditsAvailable == 0 && subpartitionView.isAvailable(Integer.MAX_VALUE); } @Override - public void resumeConsumption() { + public void resumeConsumption(int availableCredit, boolean hasUnfulfilledBacklog) throws Exception { Review comment: nit: availableCredit -> availableCredits This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r425078707 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -69,6 +73,7 @@ this.receiverId = receiverId; this.numCreditsAvailable = initialCredit; this.requestQueue = requestQueue; + this.withoutExclusiveCredits = initialCredit > 0; Review comment: initialCredit == 0; ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -69,6 +73,7 @@ this.receiverId = receiverId; this.numCreditsAvailable = initialCredit; this.requestQueue = requestQueue; + this.withoutExclusiveCredits = initialCredit > 0; Review comment: initialCredit == 0 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r425078522 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -163,8 +172,15 @@ boolean hasBuffersAvailable() { return subpartitionView.isAvailable(Integer.MAX_VALUE); } - @Override - public BufferAndAvailability getNextBuffer() throws IOException { + private AddBacklogMessage getAddBacklogMessage() { + int backlog = subpartitionView.getAndResetUnannouncedBacklog(); + if (backlog > 0) { + return new AddBacklogMessage(receiverId, backlog, false); + } + return null; Review comment: I guess we should not expect `null` return here. As long as the code path enters `getAddBacklogMessage`, then we should guarantee that the respective backlog should be more than 0. Maybe add assert backlog instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r425072084 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -328,4 +325,67 @@ public void operationComplete(ChannelFuture future) throws Exception { } } } + + /** +* Outbound message to be sent to the client. +*/ + public static abstract class ServerOutboundMessage { + protected final InputChannelID receiverId; + protected final int backlog; + private final boolean moreAvailable; + + ServerOutboundMessage(InputChannelID receiverId, int backlog, boolean moreAvailable) { + checkArgument(backlog >= 0, "Number of backlog must be non-negative."); + this.receiverId = checkNotNull(receiverId); + this.backlog = backlog; + this.moreAvailable = moreAvailable; + } + + abstract Object build(); + + public boolean isMoreAvailable() { + return moreAvailable; + } + + void recycleBufferIfNeeded() { + } + } + + static class BufferResponseMessage extends ServerOutboundMessage { + private final Buffer buffer; + private final int sequenceNumber; + + BufferResponseMessage( + Buffer buffer, + InputChannelID receiverId, + int sequenceNumber, + int backlog, + boolean moreAvailable) { + super(receiverId, backlog, moreAvailable); + this.buffer = checkNotNull(buffer); + this.sequenceNumber = sequenceNumber; + } + + @Override + Object build() { + return new BufferResponse(buffer, sequenceNumber, receiverId, backlog); + } + + @Override + void recycleBufferIfNeeded() { + buffer.recycleBuffer(); + } + } + + static class AddBacklogMessage extends ServerOutboundMessage { + + AddBacklogMessage(InputChannelID receiverId, int backlog, boolean moreAvailable) { Review comment: remove `moreAvailable` argument from the constructor, because it seems strange for `AddBacklogMessage`, then we can give false in below `super` instead. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -94,10 +97,11 @@ void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) { * NOTE: Only one thread would trigger the actual enqueue after checking the reader's * availability, so there is no race condition here. */ - private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception { - if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) { + void enqueueAvailableReader(final NetworkSequenceViewReader reader, BooleanSupplier condition) throws Exception { Review comment: Adjust the javadoc accordingly This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r424911901 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ## @@ -129,7 +132,6 @@ private boolean shouldContinueRequest(BufferPool bufferPool) { */ void requestExclusiveBuffers() throws IOException { Collection segments = globalPool.requestMemorySegments(); - checkArgument(!segments.isEmpty(), "The number of exclusive buffers per channel should be larger than 0."); Review comment: Got it. So do you think we add the `if (initialCredit > 0)` before calling this method inside `RemoteInputChannel#assignExclusiveSegments`? Just for not necessary to `synchronized` below for empty segments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r424906972 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ## @@ -313,7 +313,7 @@ int getNumberOfAvailableBuffers() { } } - int unsynchronizedGetExclusiveBuffersUsed() { + int unsynchronizedGetExclusiveBuffers() { Review comment: unsynchronizedGetAvailableExclusiveBuffers? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r424902024 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -418,8 +437,8 @@ public Buffer requestBuffer() { * * @param backlog The number of unsent buffers in the producer's sub partition. */ - void onSenderBacklog(int backlog) throws IOException { - notifyBufferAvailable(bufferManager.requestFloatingBuffers(backlog + initialCredit)); + public void onSenderBacklog(int backlog) throws IOException { Review comment: nit: also adjust the javadoc of this method accordingly. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -327,6 +322,14 @@ public void notifyBufferAvailable(int numAvailableBuffers) { } } + @Override + public void onCheckpointBarrier(CheckpointBarrier barrier) { Review comment: make it only private method inside `RemoteInputChannel`, because it is never used outside. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r424866076 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ## @@ -349,7 +373,7 @@ int unsynchronizedGetFloatingBuffersAvailable() { */ int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) { exclusiveBuffers.add(buffer); - if (getAvailableBufferSize() > numRequiredBuffers) { + if (numRequiredBuffers == 0) { Review comment: It should not be changed here. If the `numRequiredBuffers` is 0, `getAvailableBufferSize()` must be more than it. If `numRequiredBuffers` is `1` or something else, as long as the `getAvailableBufferSize()` is more than it, we also need to release a floating buffer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r424863522 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ## @@ -151,23 +149,42 @@ int requestFloatingBuffers(int numRequired) throws IOException { return numRequestedBuffers; } - numRequiredBuffers = numRequired; + numRequiredBuffers += numRequired; + numRequestedBuffers = internalRequestFloatingBuffers(numRequiredBuffers); + numRequiredBuffers -= numRequestedBuffers; + } + return numRequestedBuffers; + } - while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) { - BufferPool bufferPool = inputChannel.inputGate.getBufferPool(); - Buffer buffer = bufferPool.requestBuffer(); - if (buffer != null) { - bufferQueue.addFloatingBuffer(buffer); - numRequestedBuffers++; - } else if (bufferPool.addBufferListener(this)) { - isWaitingForFloatingBuffers = true; - break; - } + private int internalRequestFloatingBuffers(int numBuffersToRequest) throws IOException { + assert Thread.holdsLock(bufferQueue); + + int numRequestedBuffers = 0; + while (numRequestedBuffers < numBuffersToRequest && !isWaitingForFloatingBuffers) { + BufferPool bufferPool = inputChannel.inputGate.getBufferPool(); + Buffer buffer = bufferPool.requestBuffer(); + if (buffer != null) { + bufferQueue.addFloatingBuffer(buffer); + numRequestedBuffers++; + } else if (bufferPool.addBufferListener(this)) { + isWaitingForFloatingBuffers = true; + break; } } return numRequestedBuffers; } + public void unregisterBufferListenerAndReleaseFloatingBuffers() { Review comment: This method should be placed into the below section `Buffer recycle`. I think it is better to integrate this method with existing `#releaseFloatingBuffers` to provide a general one, otherwise it might bring confusing to understand the difference among them, especially for the different handle of `numRequiredBuffers`, to make them seem customized logic. The integration is as below ``` void releaseFloatingBuffers(boolean isTemporaryRelease) { synchronized (bufferQueue) { if (isWaitingForFloatingBuffers) { inputChannel.inputGate.getBufferPool().removeBufferListener(this); isWaitingForFloatingBuffers = false; } int numReleasedBuffers = bufferQueue.releaseFloatingBuffers(); if (isTemporaryRelease) { numRequiredBuffers += numReleasedBuffers; } else { numRequiredBuffers = 0; } } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r424860940 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ## @@ -151,23 +150,43 @@ int requestFloatingBuffers(int numRequired) throws IOException { return numRequestedBuffers; } - numRequiredBuffers = numRequired; + numRequiredBuffers += numRequired; + numRequestedBuffers = internalRequestFloatingBuffers(numRequiredBuffers); + numRequiredBuffers -= numRequestedBuffers; + } + return numRequestedBuffers; + } - while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) { - BufferPool bufferPool = inputChannel.inputGate.getBufferPool(); - Buffer buffer = bufferPool.requestBuffer(); - if (buffer != null) { - bufferQueue.addFloatingBuffer(buffer); - numRequestedBuffers++; - } else if (bufferPool.addBufferListener(this)) { - isWaitingForFloatingBuffers = true; - break; - } + private int internalRequestFloatingBuffers(int numBuffersToRequest) throws IOException { + assert Thread.holdsLock(bufferQueue); + + int numRequestedBuffers = 0; + while (numRequestedBuffers < numBuffersToRequest && !isWaitingForFloatingBuffers) { + BufferPool bufferPool = inputChannel.inputGate.getBufferPool(); + Buffer buffer = bufferPool.requestBuffer(); + if (buffer != null) { + bufferQueue.addFloatingBuffer(buffer); + numRequestedBuffers++; + } else if (bufferPool.addBufferListener(this)) { + isWaitingForFloatingBuffers = true; + break; } } return numRequestedBuffers; } + public void unregisterBufferListenerAndReleaseFloatingBuffers() { + synchronized (bufferQueue) { + if (isWaitingForFloatingBuffers) { + inputChannel.inputGate.getBufferPool().removeBufferListener(this); + isWaitingForFloatingBuffers = false; + } + + int bufferReleased = bufferQueue.releaseFloatingBuffers(); Review comment: nit: numReleasedBuffers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r424860940 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java ## @@ -151,23 +150,43 @@ int requestFloatingBuffers(int numRequired) throws IOException { return numRequestedBuffers; } - numRequiredBuffers = numRequired; + numRequiredBuffers += numRequired; + numRequestedBuffers = internalRequestFloatingBuffers(numRequiredBuffers); + numRequiredBuffers -= numRequestedBuffers; + } + return numRequestedBuffers; + } - while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) { - BufferPool bufferPool = inputChannel.inputGate.getBufferPool(); - Buffer buffer = bufferPool.requestBuffer(); - if (buffer != null) { - bufferQueue.addFloatingBuffer(buffer); - numRequestedBuffers++; - } else if (bufferPool.addBufferListener(this)) { - isWaitingForFloatingBuffers = true; - break; - } + private int internalRequestFloatingBuffers(int numBuffersToRequest) throws IOException { + assert Thread.holdsLock(bufferQueue); + + int numRequestedBuffers = 0; + while (numRequestedBuffers < numBuffersToRequest && !isWaitingForFloatingBuffers) { + BufferPool bufferPool = inputChannel.inputGate.getBufferPool(); + Buffer buffer = bufferPool.requestBuffer(); + if (buffer != null) { + bufferQueue.addFloatingBuffer(buffer); + numRequestedBuffers++; + } else if (bufferPool.addBufferListener(this)) { + isWaitingForFloatingBuffers = true; + break; } } return numRequestedBuffers; } + public void unregisterBufferListenerAndReleaseFloatingBuffers() { + synchronized (bufferQueue) { + if (isWaitingForFloatingBuffers) { + inputChannel.inputGate.getBufferPool().removeBufferListener(this); + isWaitingForFloatingBuffers = false; + } + + int bufferReleased = bufferQueue.releaseFloatingBuffers(); Review comment: nit: bufferReleased -> numReleasedBuffers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422972620 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -428,6 +423,23 @@ public void notifyBufferDestroyed() { // Nothing to do actually. } + @Override + public void onCheckpointBarrier(CheckpointBarrier barrier) { Review comment: Considering the race condition between this operation and canceler task, it might involve in potential conflicts and deadlock case if `removeBufferListener` and adjust `isWaitingForFloatingBuffers` here. Let me think whether we can lazy remove the listener and adjust `isWaitingForFloatingBuffers` by reusing the existing process `RemoteInputChannel#notifyBufferAvailable`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422970729 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -428,6 +423,23 @@ public void notifyBufferDestroyed() { // Nothing to do actually. } + @Override + public void onCheckpointBarrier(CheckpointBarrier barrier) { Review comment: It might be better to trigger this action by netty thread when received this barrier immediately. Otherwise the task processing might delay much time to better reuse the floating buffers for other channels. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422950710 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -99,7 +100,7 @@ /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); - /** The number of required buffers that equals to sender's backlog plus initial credit. */ + /** The number of buffers to requested that equals to unfulfilled sender's backlog. */ @GuardedBy("bufferQueue") private int numRequiredBuffers; Review comment: nit: might rename to `numRequiredFloatingBuffers` for better reflecting the current semantic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [FLINK-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422889022 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -94,13 +99,27 @@ public void requestSubpartitionView( } @Override - public void addCredit(int creditDeltas) { + public boolean addCredit(int creditDeltas) { numCreditsAvailable += creditDeltas; + return shouldAnnounceBacklog(); + } + + @Override + public boolean shouldAnnounceBacklog() { + return initialCredit == 0 && numCreditsAvailable == 0 && subpartitionView.isAvailable(Integer.MAX_VALUE); } @Override - public void resumeConsumption() { + public boolean resumeConsumption(int availableCredit, int unfulfilledBacklog) { + if (initialCredit > 0) { + checkState(numCreditsAvailable == availableCredit, "Illegal number of available credit."); Review comment: I guess this check is not necessary or invalid. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422884550 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java ## @@ -305,19 +311,18 @@ protected CheckpointBarrier parseCheckpointBarrierOrNull(Buffer buffer) throws I /** * A combination of a {@link Buffer} and a flag indicating availability of further buffers, -* and the backlog length indicating how many non-event buffers are available in the -* subpartition. +* and the backlog length indicating how many credits the subpartition. */ public static final class BufferAndAvailability { private final Buffer buffer; private final boolean moreAvailable; - private final int buffersInBacklog; + private final int backlog; - public BufferAndAvailability(Buffer buffer, boolean moreAvailable, int buffersInBacklog) { - this.buffer = checkNotNull(buffer); + public BufferAndAvailability(@Nullable Buffer buffer, boolean moreAvailable, int backlog) { Review comment: We can also avoid introducing `nullable` buffer by this comment https://github.com/apache/flink/pull/11877/files#r422847423 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422868983 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ## @@ -94,13 +99,27 @@ public void requestSubpartitionView( } @Override - public void addCredit(int creditDeltas) { + public boolean addCredit(int creditDeltas) { numCreditsAvailable += creditDeltas; + return shouldAnnounceBacklog(); Review comment: why we need to trigger announce backlog while adding credit? I assume since the added `creditDeltas` is always more than zero, then we have the chance to announce the backlog later via sending `BufferResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422864980 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ## @@ -158,11 +158,18 @@ public boolean add(BufferConsumer bufferConsumer) throws IOException { private final Buffer buffer; private final boolean isDataAvailable; private final int buffersInBacklog; + private final int unannouncedBacklog; Review comment: `buffersInBacklog` and `unannouncedBacklog` should be retained only one finally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422863695 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ## @@ -70,6 +70,10 @@ @GuardedBy("buffers") private int buffersInBacklog; + /** The number of non-event buffers to be announced to the downstream. */ + @GuardedBy("buffers") + private int unannouncedBacklog; Review comment: The previous `buffersInBacklog` variable should be replaced by this new variable, to avoid maintaining two variables. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422847423 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ## @@ -234,11 +236,16 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO registerAvailableReader(reader); } - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(), - next.buffersInBacklog()); + Object msg; + if (next.buffer() != null) { Review comment: The different message path is distinguished in both `PartitionRequestQueue` and `CreditBasedSequenceNumberingViewReader` now. We can improve it to judge only in one place instead. 1. Introduce `ServerOutboundMessage` class to extend `NettyMessage` and make `AddBacklog` and `BufferResponse` both extend `ServerOutboundMessage`. 2. Introduce `NetworkSequenceViewReader#getNextMessage` instead of existing `NetworkSequenceViewReader#getNextBuffer`. And inside `CreditBasedSequenceNumberingViewReader` implementation, we can judge the condition for distinguish. ``` public NettyMessage.ServerOutboundMessage getNextMessage() throws IOException { if (numCreditsAvailable == 0 && initialCredit == 0 && !subpartitionView.isAvailable(numCreditsAvailable)) { return getBacklogMessage(); } else { return getNextBufferResponse(); } } ``` To do so we can also reduce the necessary transformation between `BufferAndAvailability` and `BufferResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422466188 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -428,6 +423,23 @@ public void notifyBufferDestroyed() { // Nothing to do actually. } + @Override + public void onCheckpointBarrier(CheckpointBarrier barrier) { Review comment: Make the following condition out of `synchronized` part, then we do not need to touch the lock for most of the cases. ``` if (!(initialCredit == 0 && options.isExactlyOnceMode() && !options.isUnalignedCheckpoint())) { return; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhijiangW commented on a change in pull request #11877: [Flink-16641][network] Announce sender's backlog to solve the deadlock issue without exclusive buffers
zhijiangW commented on a change in pull request #11877: URL: https://github.com/apache/flink/pull/11877#discussion_r422466188 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -428,6 +423,23 @@ public void notifyBufferDestroyed() { // Nothing to do actually. } + @Override + public void onCheckpointBarrier(CheckpointBarrier barrier) { Review comment: Make the following condition out of `synchronized` part, then we do not need to touch the lock for most of the cases. ``` CheckpointOptions options = barrier.getCheckpointOptions(); if (!(initialCredit == 0 && options.isExactlyOnceMode() && !options.isUnalignedCheckpoint())) { return; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org