This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 921bfd04 [hotfix] Using Flink ExceptionUtils#findThrowable for 
exception checking (#818)
921bfd04 is described below

commit 921bfd0437881a34f3f65b5e10e85879e29c186e
Author: Xin Hao <haoxi...@gmail.com>
AuthorDate: Sun Apr 28 10:45:29 2024 +0800

    [hotfix] Using Flink ExceptionUtils#findThrowable for exception checking 
(#818)
    
    Update 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
    
    Co-authored-by: Rui Fan <1996fan...@gmail.com>
    
    * fix
    
    ---------
    
    Co-authored-by: Rui Fan <1996fan...@gmail.com>
---
 .../reconciler/sessionjob/SessionJobReconciler.java       |  9 +++++----
 .../flink/kubernetes/operator/TestingFlinkService.java    | 15 ++++++++++++++-
 2 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 60b6b26f..b71264d6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+
 /** The reconciler for the {@link FlinkSessionJob}. */
 public class SessionJobReconciler
         extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, 
FlinkSessionJobStatus> {
@@ -127,14 +129,13 @@ public class SessionJobReconciler
                                     : UpgradeMode.STATELESS;
                     cancelJob(ctx, upgradeMode);
                 } catch (ExecutionException e) {
-                    final var cause = e.getCause();
-
-                    if (cause instanceof FlinkJobNotFoundException) {
+                    if (findThrowable(e, 
FlinkJobNotFoundException.class).isPresent()) {
                         LOG.error("Job {} not found in the Flink cluster.", 
jobID, e);
                         return DeleteControl.defaultDelete();
                     }
 
-                    if (cause instanceof 
FlinkJobTerminatedWithoutCancellationException) {
+                    if (findThrowable(e, 
FlinkJobTerminatedWithoutCancellationException.class)
+                            .isPresent()) {
                         LOG.error("Job {} already terminated without 
cancellation.", jobID, e);
                         return DeleteControl.defaultDelete();
                     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 7936e4ed..6e461e4d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -67,9 +67,12 @@ import 
org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.concurrent.Executors;
 
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
 import io.fabric8.kubernetes.api.model.DeletionPropagation;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -92,6 +95,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
@@ -447,7 +451,16 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         }
 
         if (isFlinkJobNotFound) {
-            throw new FlinkJobNotFoundException(jobID);
+            // Throw different exceptions randomly, see
+            // https://github.com/apache/flink-kubernetes-operator/pull/818
+            if (new Random().nextBoolean()) {
+                throw new RestClientException(
+                        "Job could not be found.",
+                        new FlinkJobNotFoundException(jobID),
+                        HttpResponseStatus.NOT_FOUND);
+            } else {
+                throw new FlinkJobNotFoundException(jobID);
+            }
         }
 
         var jobOpt = jobs.stream().filter(js -> 
js.f1.getJobId().equals(jobID)).findAny();

Reply via email to