[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-16 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r544876522



##
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
##
@@ -84,6 +84,24 @@ public void testChannelResetOnNewBarrier() throws Exception {
assertFalse(stateWriter.getAddedInput().isEmpty());
}
 
+   /**
+* If a checkpoint announcement was processed and then UC-barrier 
arrives (from the upstream)
+* then it should be processed by the UC controller.
+*/
+   @Test
+   public void testSwitchToUnalignedByUpstream() throws Exception {
+   SingleInputGate inputGate = new 
SingleInputGateBuilder().setNumberOfChannels(2).build();
+   inputGate.setInputChannels(new TestInputChannel(inputGate, 0), 
new TestInputChannel(inputGate, 1));
+   ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
+   SingleCheckpointBarrierHandler barrierHandler = 
barrierHandler(inputGate, target);
+   CheckpointedInputGate gate = buildGate(target, 2);
+
+   CheckpointBarrier aligned = new CheckpointBarrier(1, 
System.currentTimeMillis(), alignedWithTimeout(getDefault(), 
Integer.MAX_VALUE));
+
+   send(toBuffer(new EventAnnouncement(aligned, 0), true), 0, 
gate); // process announcement but not the barrier
+   send(toBuffer(aligned.asUnaligned(), true), 1, gate); // 
pretend it came from upstream before the first (AC) barrier was picked up
+   }

Review comment:
   The expectation is that it just won't fail (without the fix, it will).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-16 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r544868039



##
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
##
@@ -68,6 +68,22 @@
  */
 public class AlternatingControllerTest {
 
+   /**
+* Upon subsuming (or canceling) a checkpoint, channels should be 
notified regardless of whether UC controller is
+* currently being used or not. Otherwise, channels may capture 
in-flight buffers from an older checkpoint.

Review comment:
   Sorry, my bad, I didn't update the comment after updating the test. 
   It checks whether the buffer (on the other channel) was captured or not. 
I'll update the comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-16 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r544865125



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -451,11 +451,20 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
}
else {
receivedBuffers.add(sequenceBuffer);
-   
channelStatePersister.maybePersist(buffer);
if (dataType.requiresAnnouncement()) {
firstPriorityEvent = 
addPriorityBuffer(announce(sequenceBuffer));
}
}
+   channelStatePersister
+   .checkForBarrier(sequenceBuffer.buffer)
+   .filter(id -> id > lastBarrierId)
+   .ifPresent(id -> {
+   // checkpoint was not yet 
started by task thread,
+   // so remember the numbers of 
buffers to spill for the time when it will be started
+   lastBarrierId = id;
+   lastBarrierSequenceNumber = 
sequenceBuffer.sequenceNumber;
+   });
+   channelStatePersister.maybePersist(buffer);

Review comment:
   > Side question, shouldn't those two fixes be separate commits?
   
   They were in the beginning, but it seemed to me just way too many commits :) 
So I tried to group some related changes. I can extract it if you prefer.
   
   > And what about test coverage for those changes?
   
   As [noted](https://github.com/apache/flink/pull/14057#discussion_r543678627) 
above
   >  I didn't add a unit test as after the other fixes in master (#14052) this 
change is not strictly necessary
   > (though I think it's still less error-prone to not update SQN 
unnecessarily).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-15 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543678627



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -451,11 +451,17 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
}
else {
receivedBuffers.add(sequenceBuffer);
-   
channelStatePersister.maybePersist(buffer);
if (dataType.requiresAnnouncement()) {
firstPriorityEvent = 
addPriorityBuffer(announce(sequenceBuffer));
}
}
+   
channelStatePersister.checkForBarrier(sequenceBuffer.buffer).ifPresent(id -> {
+   // checkpoint was not yet started by 
task thread,
+   // so remember the numbers of buffers 
to spill for the time when it will be started
+   lastBarrierSequenceNumber = 
sequenceBuffer.sequenceNumber;
+   lastBarrierId = id;
+   });
+   channelStatePersister.maybePersist(buffer);

Review comment:
   I didn't add a unit test as after the other fixes in master (#14052) 
this change is not strictly necessary
   (though I think it's still less error-prone to not update SQN unnecessarily).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-15 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543676863



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
##
@@ -229,11 +229,8 @@ public void run() {
 
numBytesIn.inc(buffer.getSize());
numBuffersIn.inc();
-   if (buffer.getDataType().hasPriority()) {
-   channelStatePersister.checkForBarrier(buffer);
-   } else {
-   channelStatePersister.maybePersist(buffer);
-   }
+   channelStatePersister.checkForBarrier(buffer);
+   channelStatePersister.maybePersist(buffer);

Review comment:
   I've added 
`LocalInputChannelTest.testNoDataPersistedAfterReceivingAlignedBarrier`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-15 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543676375



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##
@@ -569,14 +569,23 @@ public void convertToPriorityEvent(int sequenceNumber) 
throws IOException {
"Attempted to convertToPriorityEvent an event 
[%s] that has already been prioritized [%s]",
toPrioritize,
numPriorityElementsBeforeRemoval);
+   // set the priority flag (checked on poll)
+   // don't convert the barrier itself (barrier controller 
might not have been switched yet)
+   AbstractEvent e = 
EventSerializer.fromBuffer(toPrioritize.buffer, 
this.getClass().getClassLoader());
+   toPrioritize.buffer.setReaderIndex(0);
+   toPrioritize = new 
SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber);
firstPriorityEvent = addPriorityBuffer(toPrioritize);   
// note that only position of the element is changed

// converting the event 
itself would require switching the controller sooner
}
if (firstPriorityEvent) {
-   notifyPriorityEvent(sequenceNumber);
+   notifyPriorityEventForce(); // use force here because 
the barrier SQN might be seen by gate during the announcement

Review comment:
   Rephrased as:
   ```
   // forcibly notify about the priority event
   // instead of passing barrier SQN to be checked
   // because this SQN might have be seen by the input gate during the 
announcement
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-15 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543675851



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##
@@ -114,6 +114,7 @@ public void barrierAnnouncement(
lastSeenBarrier = barrier.getId();
firstBarrierArrivalTime = getArrivalTime(barrier);
}
+   activeController = chooseController(barrier);

Review comment:
   I've added `AlternatingControllerTest.testSwitchToUnalignedByUpstream`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-15 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543675153



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
##
@@ -120,6 +122,12 @@ public void obsoleteBarrierReceived(
resumeConsumption(channelInfo);
}
 
+   protected void resetPendingCheckpoint(long cancelledId) {
+   for (final CheckpointableInput input : inputs) {
+   input.checkpointStopped(cancelledId);
+   }
+   }
+

Review comment:
   I've added `AlternatingControllerTest.testChannelResetOnNewBarrier`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-15 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543283061



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##
@@ -146,28 +146,7 @@ private void switchToUnaligned(
 
@Override
public Optional 
postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
throws IOException, CheckpointException {
-   Optional maybeTimeOut = asTimedOut(barrier);
-   if (maybeTimeOut.isPresent() && activeController == 
alignedController) {
-   switchToUnaligned(channelInfo, maybeTimeOut.get());
-   checkState(activeController == unalignedController);
-   
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
-   return maybeTimeOut;
-   }
-
-   barrier = maybeTimeOut.orElse(barrier);
-   if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
-   checkState(activeController == unalignedController);
-   
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
-   return Optional.empty();
-   }
-   else {
-   checkState(activeController == alignedController);
-   Optional triggerResult = 
activeController.postProcessLastBarrier(
-   channelInfo,
-   barrier);
-   checkState(triggerResult.isPresent());
-   return triggerResult;
-   }

Review comment:
   Isn't it enough to just convert the barrier upon triggering checkpoint 
in subtask?
   
   This is what I've done in `[FLINK-19681][checkpointing] Use converted 
barrier after disabling alignment`. It is needed anyways for correct handling 
in `SubtaskCheckpointCoordinatorImpl`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-15 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543280004



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##
@@ -114,6 +114,7 @@ public void barrierAnnouncement(
lastSeenBarrier = barrier.getId();
firstBarrierArrivalTime = getArrivalTime(barrier);
}
+   activeController = chooseController(barrier);

Review comment:
   But there is a `preProcessFirstBarrier` before the switch in 
`barrierReceived`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-15 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543278831



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##
@@ -193,6 +202,11 @@ private CheckpointBarrierBehaviourController 
chooseController(CheckpointBarrier
 
private boolean canTimeout(CheckpointBarrier barrier) {
return barrier.getCheckpointOptions().isTimeoutable() &&
-   barrier.getCheckpointOptions().getAlignmentTimeout() < 
(System.currentTimeMillis() - barrier.getTimestamp());
+   barrier.getId() <= lastSeenBarrier &&
+   barrier.getCheckpointOptions().getAlignmentTimeout() * 
1_000_000 < (System.nanoTime() - firstBarrierArrivalTime);
+   }
+
+   private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+   return announcedBarrier.getCheckpointOptions().isTimeoutable() 
? System.nanoTime() : Long.MAX_VALUE;

Review comment:
   With a single input, this is not an alignment issue but a (related) 
back-pressure issue. And I think this is the right way to solve it: 
   > Active timeout would alleviate this problem though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-10 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r540446583



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
##
@@ -119,6 +119,6 @@ public boolean isCheckpoint() {
}
 
public CheckpointBarrier asUnaligned() {
-   return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut());
+   return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), 
getCheckpointOptions().toUnaligned());

Review comment:
   No, I think it belongs to the right commit. With `CheckpointOptions` 
hotfix/refactoring, this method converts it to `unaligned`, which can't be 
timed out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-10 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r54075



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##
@@ -146,28 +146,7 @@ private void switchToUnaligned(
 
@Override
public Optional 
postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
throws IOException, CheckpointException {
-   Optional maybeTimeOut = asTimedOut(barrier);
-   if (maybeTimeOut.isPresent() && activeController == 
alignedController) {
-   switchToUnaligned(channelInfo, maybeTimeOut.get());
-   checkState(activeController == unalignedController);
-   
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
-   return maybeTimeOut;
-   }
-
-   barrier = maybeTimeOut.orElse(barrier);
-   if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
-   checkState(activeController == unalignedController);
-   
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
-   return Optional.empty();
-   }
-   else {
-   checkState(activeController == alignedController);
-   Optional triggerResult = 
activeController.postProcessLastBarrier(
-   channelInfo,
-   barrier);
-   checkState(triggerResult.isPresent());
-   return triggerResult;
-   }

Review comment:
   Yes, with `switchToUnaligned` in `barrierReceived` it seems unnecessary.
   With this code, `UnalignedCheckpointITCase` fails: some tests on 
`checkState` (line 164 above), some hanging up.
   
   Besides, why timeout alignment if it's the last barrier? This essentially 
means that alignment is done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-10 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r540438232



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##
@@ -193,6 +202,11 @@ private CheckpointBarrierBehaviourController 
chooseController(CheckpointBarrier
 
private boolean canTimeout(CheckpointBarrier barrier) {
return barrier.getCheckpointOptions().isTimeoutable() &&
-   barrier.getCheckpointOptions().getAlignmentTimeout() < 
(System.currentTimeMillis() - barrier.getTimestamp());
+   barrier.getId() <= lastSeenBarrier &&
+   barrier.getCheckpointOptions().getAlignmentTimeout() * 
1_000_000 < (System.nanoTime() - firstBarrierArrivalTime);
+   }
+
+   private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+   return announcedBarrier.getCheckpointOptions().isTimeoutable() 
? System.nanoTime() : Long.MAX_VALUE;

Review comment:
   >  easier to understand
   
   I agree that it might be true for some users, but not for all. During the 
[previous 
discussion](https://github.com/apache/flink/pull/13827#discussion_r527794600), 
and also the one before, the consensus was that it's **not** easier to 
understand. However, we can discuss it again. 
   (also there are some more technical advantages of "local" timeouts)
   
   > Secondly your proposed change will not work with single input tasks 
without active timeouts?
   
   Why, could you explain?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #14057: [FLINK-19681][checkpointing] Timeout aligned checkpoints

2020-12-10 Thread GitBox


rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r540428875



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##
@@ -114,6 +114,7 @@ public void barrierAnnouncement(
lastSeenBarrier = barrier.getId();
firstBarrierArrivalTime = getArrivalTime(barrier);
}
+   activeController = chooseController(barrier);

Review comment:
   Consider a checkpoint with two barriers from two channels: UC and AC:
   1. Process AC announcement. Set 
`SingleCheckpointBarrierHandler.currentCheckpointId`. Don't timeout. 
   2. Process UC barrier (no announcements). Because `currentCheckpointId` was 
set `checkSubsumedCheckpoint` does nothing, i.e. 
`preProcessFirstBarrierOrAnnouncement` not called and controller so 
`activeController` == AC
   3. Further process (UC) barrier - now AC controller is processing UC barrier
   
   > (as in other places: a unit test would be helpful)
   
   I agree, will add it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org