akalash commented on a change in pull request #16637:
URL: https://github.com/apache/flink/pull/16637#discussion_r685136777



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
##########
@@ -121,14 +123,17 @@ private static void acknowledgeAllCoordinators(
                 final Throwable error =
                         checkpoint.isDisposed() ? checkpoint.getFailureCause() 
: null;
 
+                CheckpointFailureReason reason = 
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE;

Review comment:
       I do not really follow the idea of this method(the original one). As I 
understand method `acknowledgeAllCoordinators` can be called only after all 
taskManagers finish the checkpoint it means that we can check 
`checkpoint.getFailureCause()` before the loop and throw the exception if 
needed. I also don't understand why if we already have the 
`CheckpointException`(from checkpoint.getFailureCause()) with specific 
`ReasonType` we always wrap it by one more `CheckpointException` with the 
constant type `TRIGGER_CHECKPOINT_FAILURE`.
   Perhaps, @zlzhang0122 or @pnowojski can explain it?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -136,6 +135,8 @@ public void checkFailureCounter(CheckpointException 
exception, long checkpointId
                 // ignore
                 break;
 
+            case EXCEPTION:

Review comment:
       Hm, yes, I see. Actually, I don't see the difference between EXCEPTION 
and IO_EXCEPTION right now. I personally don't think that it makes sense to 
keep both of them, perhaps it is better to use IO_EXCEPTION in both places. But 
perhaps, it was some specific scenario why the EXCEPTION was introduced. 
   @pnowojski what do you think. Do we really need to keep both 
IO_EXCEPTION(new one) and EXCEPTION? Why is EXCEPTION  ignored in 
FailureManager right now?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
##########
@@ -121,14 +123,17 @@ private static void acknowledgeAllCoordinators(
                 final Throwable error =
                         checkpoint.isDisposed() ? checkpoint.getFailureCause() 
: null;
 
+                CheckpointFailureReason reason = 
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE;
                 if (error != null) {
-                    throw new CheckpointException(
-                            errorMessage,
-                            CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
-                            error);
+                    Throwable rootThrowable = 
ExceptionUtils.stripCompletionException(error);

Review comment:
       Again, I don't sure that we should have `stripCompletionException` here, 
perhaps, it should be `findThrowable`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to