[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r544876522 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java ## @@ -84,6 +84,24 @@ public void testChannelResetOnNewBarrier() throws Exception { assertFalse(stateWriter.getAddedInput().isEmpty()); } + /** +* If a checkpoint announcement was processed and then UC-barrier arrives (from the upstream) +* then it should be processed by the UC controller. +*/ + @Test + public void testSwitchToUnalignedByUpstream() throws Exception { + SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build(); + inputGate.setInputChannels(new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)); + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + SingleCheckpointBarrierHandler barrierHandler = barrierHandler(inputGate, target); + CheckpointedInputGate gate = buildGate(target, 2); + + CheckpointBarrier aligned = new CheckpointBarrier(1, System.currentTimeMillis(), alignedWithTimeout(getDefault(), Integer.MAX_VALUE)); + + send(toBuffer(new EventAnnouncement(aligned, 0), true), 0, gate); // process announcement but not the barrier + send(toBuffer(aligned.asUnaligned(), true), 1, gate); // pretend it came from upstream before the first (AC) barrier was picked up + } Review comment: The expectation is that it just won't fail (without the fix, it will). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r544868039 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java ## @@ -68,6 +68,22 @@ */ public class AlternatingControllerTest { + /** +* Upon subsuming (or canceling) a checkpoint, channels should be notified regardless of whether UC controller is +* currently being used or not. Otherwise, channels may capture in-flight buffers from an older checkpoint. Review comment: Sorry, my bad, I didn't update the comment after updating the test. It checks whether the buffer (on the other channel) was captured or not. I'll update the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r544865125 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -451,11 +451,20 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx } else { receivedBuffers.add(sequenceBuffer); - channelStatePersister.maybePersist(buffer); if (dataType.requiresAnnouncement()) { firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer)); } } + channelStatePersister + .checkForBarrier(sequenceBuffer.buffer) + .filter(id -> id > lastBarrierId) + .ifPresent(id -> { + // checkpoint was not yet started by task thread, + // so remember the numbers of buffers to spill for the time when it will be started + lastBarrierId = id; + lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber; + }); + channelStatePersister.maybePersist(buffer); Review comment: > Side question, shouldn't those two fixes be separate commits? They were in the beginning, but it seemed to me just way too many commits :) So I tried to group some related changes. I can extract it if you prefer. > And what about test coverage for those changes? As [noted](https://github.com/apache/flink/pull/14057#discussion_r543678627) above > I didn't add a unit test as after the other fixes in master (#14052) this change is not strictly necessary > (though I think it's still less error-prone to not update SQN unnecessarily). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543678627 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -451,11 +451,17 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx } else { receivedBuffers.add(sequenceBuffer); - channelStatePersister.maybePersist(buffer); if (dataType.requiresAnnouncement()) { firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer)); } } + channelStatePersister.checkForBarrier(sequenceBuffer.buffer).ifPresent(id -> { + // checkpoint was not yet started by task thread, + // so remember the numbers of buffers to spill for the time when it will be started + lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber; + lastBarrierId = id; + }); + channelStatePersister.maybePersist(buffer); Review comment: I didn't add a unit test as after the other fixes in master (#14052) this change is not strictly necessary (though I think it's still less error-prone to not update SQN unnecessarily). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543676863 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ## @@ -229,11 +229,8 @@ public void run() { numBytesIn.inc(buffer.getSize()); numBuffersIn.inc(); - if (buffer.getDataType().hasPriority()) { - channelStatePersister.checkForBarrier(buffer); - } else { - channelStatePersister.maybePersist(buffer); - } + channelStatePersister.checkForBarrier(buffer); + channelStatePersister.maybePersist(buffer); Review comment: I've added `LocalInputChannelTest.testNoDataPersistedAfterReceivingAlignedBarrier`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543676375 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -569,14 +569,23 @@ public void convertToPriorityEvent(int sequenceNumber) throws IOException { "Attempted to convertToPriorityEvent an event [%s] that has already been prioritized [%s]", toPrioritize, numPriorityElementsBeforeRemoval); + // set the priority flag (checked on poll) + // don't convert the barrier itself (barrier controller might not have been switched yet) + AbstractEvent e = EventSerializer.fromBuffer(toPrioritize.buffer, this.getClass().getClassLoader()); + toPrioritize.buffer.setReaderIndex(0); + toPrioritize = new SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber); firstPriorityEvent = addPriorityBuffer(toPrioritize); // note that only position of the element is changed // converting the event itself would require switching the controller sooner } if (firstPriorityEvent) { - notifyPriorityEvent(sequenceNumber); + notifyPriorityEventForce(); // use force here because the barrier SQN might be seen by gate during the announcement Review comment: Rephrased as: ``` // forcibly notify about the priority event // instead of passing barrier SQN to be checked // because this SQN might have be seen by the input gate during the announcement ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543675851 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ## @@ -114,6 +114,7 @@ public void barrierAnnouncement( lastSeenBarrier = barrier.getId(); firstBarrierArrivalTime = getArrivalTime(barrier); } + activeController = chooseController(barrier); Review comment: I've added `AlternatingControllerTest.testSwitchToUnalignedByUpstream`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543675153 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java ## @@ -120,6 +122,12 @@ public void obsoleteBarrierReceived( resumeConsumption(channelInfo); } + protected void resetPendingCheckpoint(long cancelledId) { + for (final CheckpointableInput input : inputs) { + input.checkpointStopped(cancelledId); + } + } + Review comment: I've added `AlternatingControllerTest.testChannelResetOnNewBarrier`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543283061 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ## @@ -146,28 +146,7 @@ private void switchToUnaligned( @Override public Optional postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { - Optional maybeTimeOut = asTimedOut(barrier); - if (maybeTimeOut.isPresent() && activeController == alignedController) { - switchToUnaligned(channelInfo, maybeTimeOut.get()); - checkState(activeController == unalignedController); - checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent()); - return maybeTimeOut; - } - - barrier = maybeTimeOut.orElse(barrier); - if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) { - checkState(activeController == unalignedController); - checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent()); - return Optional.empty(); - } - else { - checkState(activeController == alignedController); - Optional triggerResult = activeController.postProcessLastBarrier( - channelInfo, - barrier); - checkState(triggerResult.isPresent()); - return triggerResult; - } Review comment: Isn't it enough to just convert the barrier upon triggering checkpoint in subtask? This is what I've done in `[FLINK-19681][checkpointing] Use converted barrier after disabling alignment`. It is needed anyways for correct handling in `SubtaskCheckpointCoordinatorImpl`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543280004 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ## @@ -114,6 +114,7 @@ public void barrierAnnouncement( lastSeenBarrier = barrier.getId(); firstBarrierArrivalTime = getArrivalTime(barrier); } + activeController = chooseController(barrier); Review comment: But there is a `preProcessFirstBarrier` before the switch in `barrierReceived`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r543278831 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ## @@ -193,6 +202,11 @@ private CheckpointBarrierBehaviourController chooseController(CheckpointBarrier private boolean canTimeout(CheckpointBarrier barrier) { return barrier.getCheckpointOptions().isTimeoutable() && - barrier.getCheckpointOptions().getAlignmentTimeout() < (System.currentTimeMillis() - barrier.getTimestamp()); + barrier.getId() <= lastSeenBarrier && + barrier.getCheckpointOptions().getAlignmentTimeout() * 1_000_000 < (System.nanoTime() - firstBarrierArrivalTime); + } + + private long getArrivalTime(CheckpointBarrier announcedBarrier) { + return announcedBarrier.getCheckpointOptions().isTimeoutable() ? System.nanoTime() : Long.MAX_VALUE; Review comment: With a single input, this is not an alignment issue but a (related) back-pressure issue. And I think this is the right way to solve it: > Active timeout would alleviate this problem though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r540446583 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java ## @@ -119,6 +119,6 @@ public boolean isCheckpoint() { } public CheckpointBarrier asUnaligned() { - return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut()); + return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().toUnaligned()); Review comment: No, I think it belongs to the right commit. With `CheckpointOptions` hotfix/refactoring, this method converts it to `unaligned`, which can't be timed out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r54075 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ## @@ -146,28 +146,7 @@ private void switchToUnaligned( @Override public Optional postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { - Optional maybeTimeOut = asTimedOut(barrier); - if (maybeTimeOut.isPresent() && activeController == alignedController) { - switchToUnaligned(channelInfo, maybeTimeOut.get()); - checkState(activeController == unalignedController); - checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent()); - return maybeTimeOut; - } - - barrier = maybeTimeOut.orElse(barrier); - if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) { - checkState(activeController == unalignedController); - checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent()); - return Optional.empty(); - } - else { - checkState(activeController == alignedController); - Optional triggerResult = activeController.postProcessLastBarrier( - channelInfo, - barrier); - checkState(triggerResult.isPresent()); - return triggerResult; - } Review comment: Yes, with `switchToUnaligned` in `barrierReceived` it seems unnecessary. With this code, `UnalignedCheckpointITCase` fails: some tests on `checkState` (line 164 above), some hanging up. Besides, why timeout alignment if it's the last barrier? This essentially means that alignment is done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r540438232 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ## @@ -193,6 +202,11 @@ private CheckpointBarrierBehaviourController chooseController(CheckpointBarrier private boolean canTimeout(CheckpointBarrier barrier) { return barrier.getCheckpointOptions().isTimeoutable() && - barrier.getCheckpointOptions().getAlignmentTimeout() < (System.currentTimeMillis() - barrier.getTimestamp()); + barrier.getId() <= lastSeenBarrier && + barrier.getCheckpointOptions().getAlignmentTimeout() * 1_000_000 < (System.nanoTime() - firstBarrierArrivalTime); + } + + private long getArrivalTime(CheckpointBarrier announcedBarrier) { + return announcedBarrier.getCheckpointOptions().isTimeoutable() ? System.nanoTime() : Long.MAX_VALUE; Review comment: > easier to understand I agree that it might be true for some users, but not for all. During the [previous discussion](https://github.com/apache/flink/pull/13827#discussion_r527794600), and also the one before, the consensus was that it's **not** easier to understand. However, we can discuss it again. (also there are some more technical advantages of "local" timeouts) > Secondly your proposed change will not work with single input tasks without active timeouts? Why, could you explain? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints
rkhachatryan commented on a change in pull request #14057: URL: https://github.com/apache/flink/pull/14057#discussion_r540428875 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java ## @@ -114,6 +114,7 @@ public void barrierAnnouncement( lastSeenBarrier = barrier.getId(); firstBarrierArrivalTime = getArrivalTime(barrier); } + activeController = chooseController(barrier); Review comment: Consider a checkpoint with two barriers from two channels: UC and AC: 1. Process AC announcement. Set `SingleCheckpointBarrierHandler.currentCheckpointId`. Don't timeout. 2. Process UC barrier (no announcements). Because `currentCheckpointId` was set `checkSubsumedCheckpoint` does nothing, i.e. `preProcessFirstBarrierOrAnnouncement` not called and controller so `activeController` == AC 3. Further process (UC) barrier - now AC controller is processing UC barrier > (as in other places: a unit test would be helpful) I agree, will add it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org