akalash commented on a change in pull request #16637:
URL: https://github.com/apache/flink/pull/16637#discussion_r685136777
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
##########
@@ -121,14 +123,17 @@ private static void acknowledgeAllCoordinators(
final Throwable error =
checkpoint.isDisposed() ? checkpoint.getFailureCause()
: null;
+ CheckpointFailureReason reason =
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE;
Review comment:
I do not really follow the idea of this method(the original one). As I
understand method `acknowledgeAllCoordinators` can be called only after all
taskManagers finish the checkpoint it means that we can check
`checkpoint.getFailureCause()` before the loop and throw the exception if
needed. I also don't understand why if we already have the
`CheckpointException`(from checkpoint.getFailureCause()) with specific
`ReasonType` we always wrap it by one more `CheckpointException` with the
constant type `TRIGGER_CHECKPOINT_FAILURE`.
Perhaps, @zlzhang0122 or @pnowojski can explain it?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -136,6 +135,8 @@ public void checkFailureCounter(CheckpointException
exception, long checkpointId
// ignore
break;
+ case EXCEPTION:
Review comment:
Hm, yes, I see. Actually, I don't see the difference between EXCEPTION
and IO_EXCEPTION right now. I personally don't think that it makes sense to
keep both of them, perhaps it is better to use IO_EXCEPTION in both places. But
perhaps, it was some specific scenario why the EXCEPTION was introduced.
@pnowojski what do you think. Do we really need to keep both
IO_EXCEPTION(new one) and EXCEPTION? Why is EXCEPTION ignored in
FailureManager right now?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
##########
@@ -121,14 +123,17 @@ private static void acknowledgeAllCoordinators(
final Throwable error =
checkpoint.isDisposed() ? checkpoint.getFailureCause()
: null;
+ CheckpointFailureReason reason =
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE;
if (error != null) {
- throw new CheckpointException(
- errorMessage,
- CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
- error);
+ Throwable rootThrowable =
ExceptionUtils.stripCompletionException(error);
Review comment:
Again, I don't sure that we should have `stripCompletionException` here,
perhaps, it should be `findThrowable`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]