This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 51a91049 [FLINK-33522] Be ware of SerializedThrowable when checking 
for StopWithSavepointStoppingException (#716)
51a91049 is described below

commit 51a91049b5f17f8a0b21e11feceb4410a97c50c1
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Fri Dec 1 10:47:40 2023 +0100

    [FLINK-33522] Be ware of SerializedThrowable when checking for 
StopWithSavepointStoppingException (#716)
    
    Turns out that the previous detection code in #706 may not always fire 
correctly
    due to an encapsulated serialized throwable. This minor change fixes that.
---
 .../flink/kubernetes/operator/service/AbstractFlinkService.java     | 6 ++++--
 .../flink/kubernetes/operator/service/AbstractFlinkServiceTest.java | 5 +++--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 0821b439..3c4fe4aa 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -331,8 +331,10 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                     exception);
                         } catch (Exception e) {
                             var stopWithSavepointException =
-                                    ExceptionUtils.findThrowable(
-                                            e, 
StopWithSavepointStoppingException.class);
+                                    
ExceptionUtils.findThrowableSerializedAware(
+                                            e,
+                                            
StopWithSavepointStoppingException.class,
+                                            getClass().getClassLoader());
                             if (stopWithSavepointException.isPresent()) {
                                 // Handle edge case where the savepoint 
completes but the job fails
                                 // right afterward.
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index c12caf21..a108b8bb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -651,8 +651,9 @@ public class AbstractFlinkServiceTest {
                     if (failAfterSavepointCompletes) {
                         stopWithSavepointFuture.completeExceptionally(
                                 new CompletionException(
-                                        new StopWithSavepointStoppingException(
-                                                savepointPath, jobID)));
+                                        new SerializedThrowable(
+                                                new 
StopWithSavepointStoppingException(
+                                                        savepointPath, 
jobID))));
                     } else {
                         stopWithSavepointFuture.complete(
                                 new Tuple3<>(id, formatType, savepointDir));

Reply via email to