This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 51a91049 [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716) 51a91049 is described below commit 51a91049b5f17f8a0b21e11feceb4410a97c50c1 Author: Maximilian Michels <m...@apache.org> AuthorDate: Fri Dec 1 10:47:40 2023 +0100 [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716) Turns out that the previous detection code in #706 may not always fire correctly due to an encapsulated serialized throwable. This minor change fixes that. --- .../flink/kubernetes/operator/service/AbstractFlinkService.java | 6 ++++-- .../flink/kubernetes/operator/service/AbstractFlinkServiceTest.java | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 0821b439..3c4fe4aa 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -331,8 +331,10 @@ public abstract class AbstractFlinkService implements FlinkService { exception); } catch (Exception e) { var stopWithSavepointException = - ExceptionUtils.findThrowable( - e, StopWithSavepointStoppingException.class); + ExceptionUtils.findThrowableSerializedAware( + e, + StopWithSavepointStoppingException.class, + getClass().getClassLoader()); if (stopWithSavepointException.isPresent()) { // Handle edge case where the savepoint completes but the job fails // right afterward. diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index c12caf21..a108b8bb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -651,8 +651,9 @@ public class AbstractFlinkServiceTest { if (failAfterSavepointCompletes) { stopWithSavepointFuture.completeExceptionally( new CompletionException( - new StopWithSavepointStoppingException( - savepointPath, jobID))); + new SerializedThrowable( + new StopWithSavepointStoppingException( + savepointPath, jobID)))); } else { stopWithSavepointFuture.complete( new Tuple3<>(id, formatType, savepointDir));