akalash commented on a change in pull request #15496: URL: https://github.com/apache/flink/pull/15496#discussion_r607961267
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -946,41 +952,35 @@ private boolean triggerCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); + long checkpointId = checkpointMetaData.getCheckpointId(); try { // No alignment if we inject a checkpoint CheckpointMetricsBuilder checkpointMetrics = new CheckpointMetricsBuilder() .setAlignmentDurationNanos(0L) .setBytesProcessedDuringAlignment(0L); - subtaskCheckpointCoordinator.initInputsCheckpoint( - checkpointMetaData.getCheckpointId(), checkpointOptions); + subtaskCheckpointCoordinator.initInputsCheckpoint(checkpointId, checkpointOptions); boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); if (!success) { - declineCheckpoint(checkpointMetaData.getCheckpointId()); + declineCheckpoint(checkpointId, CHECKPOINT_DECLINED_TASK_NOT_READY, null); } return success; } catch (Exception e) { - // propagate exceptions only if the task is still in "running" state + // Decline checkpoint globally only if the task is still in "running" state if (isRunning) { - throw new Exception( - "Could not perform checkpoint " - + checkpointMetaData.getCheckpointId() - + " for operator " - + getName() - + '.', - e); + declineCheckpoint(checkpointId, CHECKPOINT_DECLINED, e); Review comment: It is not a target of the current task to change propagation of exception to declineCheckpoint. But this task emphasizes inconsistency between these two approaches. Why when checkpoint fails asynchronously it leads to declineCheckpoint but when it fails here( synchronously) it leads to a different situation(exception propagation) but the same result - the job will be canceled. So perhaps, I didn't get anything but let's discuss it here. Should the declining of checkpoint be the same in all cases? or It should be different for synchronous and asynchronous cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org