akalash commented on a change in pull request #16637:
URL: https://github.com/apache/flink/pull/16637#discussion_r683558429
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -862,11 +862,19 @@ private void onTriggerSuccess() {
*/
private void onTriggerFailure(
CheckpointTriggerRequest onCompletionPromise, Throwable throwable)
{
- final CheckpointException checkpointException =
- getCheckpointException(
- CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
throwable);
- onCompletionPromise.completeExceptionally(checkpointException);
- onTriggerFailure((PendingCheckpoint) null, checkpointException);
+ Throwable rootThrowable =
ExceptionUtils.stripCompletionException(throwable);
Review comment:
It is actually not a root throwable. stripCompletionException only
unwraps CompletionException. But what if IOException would be hidden deeper
than one level? According to the task description, I suppose we interesting in
any IOException at any cause level. So if I understand everything right
`ExceptionUtils.findThrowable(IOException).isPresent()` is more correct
construction here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -136,6 +135,8 @@ public void checkFailureCounter(CheckpointException
exception, long checkpointId
// ignore
break;
+ case EXCEPTION:
Review comment:
What is the reason for moving EXCEPTION out of ignore? I don't see any
clue of this either in the ticket or in the tests.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -862,11 +862,19 @@ private void onTriggerSuccess() {
*/
private void onTriggerFailure(
CheckpointTriggerRequest onCompletionPromise, Throwable throwable)
{
- final CheckpointException checkpointException =
- getCheckpointException(
- CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
throwable);
- onCompletionPromise.completeExceptionally(checkpointException);
- onTriggerFailure((PendingCheckpoint) null, checkpointException);
+ Throwable rootThrowable =
ExceptionUtils.stripCompletionException(throwable);
Review comment:
Also, why do you use unwrapped throwable only if it is IOException but
if it is not you use the initial one? I think logic should be consistent here.
If we know that this throwable can be wrapped by CompletionException we should
unwrap it in both cases(Perhaps, it makes sense to move unwrapping into
`getCheckpointException`).
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
##########
@@ -96,7 +96,7 @@ public void testTotalCountValue() {
}
// CHECKPOINT_DECLINED, CHECKPOINT_EXPIRED and
CHECKPOINT_ASYNC_EXCEPTION
- assertEquals(3, callback.getInvokeCounter());
+ assertEquals(5, callback.getInvokeCounter());
Review comment:
Inconsistency between number and comment.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -862,11 +862,19 @@ private void onTriggerSuccess() {
*/
private void onTriggerFailure(
CheckpointTriggerRequest onCompletionPromise, Throwable throwable)
{
- final CheckpointException checkpointException =
- getCheckpointException(
- CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
throwable);
- onCompletionPromise.completeExceptionally(checkpointException);
- onTriggerFailure((PendingCheckpoint) null, checkpointException);
+ Throwable rootThrowable =
ExceptionUtils.stripCompletionException(throwable);
+ if (rootThrowable instanceof IOException) {
+ final CheckpointException checkpointException =
+
getCheckpointException(CheckpointFailureReason.IO_EXCEPTION, rootThrowable);
+ onCompletionPromise.completeExceptionally(checkpointException);
+ onTriggerFailure((PendingCheckpoint) null, throwable);
+ } else {
+ final CheckpointException checkpointException =
+ getCheckpointException(
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+ onCompletionPromise.completeExceptionally(checkpointException);
+ onTriggerFailure((PendingCheckpoint) null, checkpointException);
Review comment:
I didn't get why `if` and `else` blocks are different(one passes
throwable to onTriggerFailure while another passes checkpointException). As I
understand you should only choose CheckpointFailureReason but another code
should be the same. Or Did I understand something wrong?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
##########
@@ -81,6 +81,8 @@
FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."),
+ IO_EXCEPTION(false, "Trigger checkpoint failure."),
Review comment:
Please fix the description. right now it is just copy-paste of
`TRIGGER_CHECKPOINT_FAILURE`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -624,6 +625,36 @@ public void
testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
}
}
+ @Test
+ public void testIOExceptionCheckpointExceedsTolerableFailureNumber()
throws Exception {
Review comment:
This test checks that the task is failed when it receives the new reason
type but I don't see tests that check that your new code inside of
`onTriggerFailure` and `onTriggerFailure` work.
As I can see, at least `testPeriodicSchedulingWithInactiveTasks`,
`testTriggerCheckpointAfterCancel` and
`testCheckpointAbortsIfTriggerTasksAreFinished`(and I sure many more) trigger
somehow the exception. Perhaps, you can take a look how these tests works and
you can write something similar.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -892,9 +900,12 @@ private void onTriggerFailure(@Nullable PendingCheckpoint
checkpoint, Throwable
job,
numUnsuccessful,
throwable);
- final CheckpointException cause =
- getCheckpointException(
-
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+ CheckpointFailureReason defaultReason =
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE;
+ if (throwable instanceof IOException) {
Review comment:
The same here. Why are you so sure that IOException would not be wrapped
by other exceptions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]