akalash commented on a change in pull request #15496:
URL: https://github.com/apache/flink/pull/15496#discussion_r607961267



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -946,41 +952,35 @@ private boolean triggerCheckpoint(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions)
             throws Exception {
         FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+        long checkpointId = checkpointMetaData.getCheckpointId();
         try {
             // No alignment if we inject a checkpoint
             CheckpointMetricsBuilder checkpointMetrics =
                     new CheckpointMetricsBuilder()
                             .setAlignmentDurationNanos(0L)
                             .setBytesProcessedDuringAlignment(0L);
 
-            subtaskCheckpointCoordinator.initInputsCheckpoint(
-                    checkpointMetaData.getCheckpointId(), checkpointOptions);
+            subtaskCheckpointCoordinator.initInputsCheckpoint(checkpointId, 
checkpointOptions);
 
             boolean success =
                     performCheckpoint(checkpointMetaData, checkpointOptions, 
checkpointMetrics);
             if (!success) {
-                declineCheckpoint(checkpointMetaData.getCheckpointId());
+                declineCheckpoint(checkpointId, 
CHECKPOINT_DECLINED_TASK_NOT_READY, null);
             }
             return success;
         } catch (Exception e) {
-            // propagate exceptions only if the task is still in "running" 
state
+            // Decline checkpoint globally only if the task is still in 
"running" state
             if (isRunning) {
-                throw new Exception(
-                        "Could not perform checkpoint "
-                                + checkpointMetaData.getCheckpointId()
-                                + " for operator "
-                                + getName()
-                                + '.',
-                        e);
+                declineCheckpoint(checkpointId, CHECKPOINT_DECLINED, e);

Review comment:
       It is not a target of the current task to change propagation of 
exception to declineCheckpoint. But this task emphasizes inconsistency between 
these two approaches. Why when checkpoint fails asynchronously it leads to 
declineCheckpoint but when it fails here( synchronously) it leads to a 
different situation(exception propagation) but the same result - the job will 
be canceled.
   So perhaps, I didn't get anything but let's discuss it here. Should the 
declining of checkpoint be the same in all cases? or It should be different for 
synchronous and asynchronous cases?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to