This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 921bfd04 [hotfix] Using Flink ExceptionUtils#findThrowable for exception checking (#818) 921bfd04 is described below commit 921bfd0437881a34f3f65b5e10e85879e29c186e Author: Xin Hao <haoxi...@gmail.com> AuthorDate: Sun Apr 28 10:45:29 2024 +0800 [hotfix] Using Flink ExceptionUtils#findThrowable for exception checking (#818) Update flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java Co-authored-by: Rui Fan <1996fan...@gmail.com> * fix --------- Co-authored-by: Rui Fan <1996fan...@gmail.com> --- .../reconciler/sessionjob/SessionJobReconciler.java | 9 +++++---- .../flink/kubernetes/operator/TestingFlinkService.java | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java index 60b6b26f..b71264d6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java @@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory; import java.util.Optional; import java.util.concurrent.ExecutionException; +import static org.apache.flink.util.ExceptionUtils.findThrowable; + /** The reconciler for the {@link FlinkSessionJob}. */ public class SessionJobReconciler extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> { @@ -127,14 +129,13 @@ public class SessionJobReconciler : UpgradeMode.STATELESS; cancelJob(ctx, upgradeMode); } catch (ExecutionException e) { - final var cause = e.getCause(); - - if (cause instanceof FlinkJobNotFoundException) { + if (findThrowable(e, FlinkJobNotFoundException.class).isPresent()) { LOG.error("Job {} not found in the Flink cluster.", jobID, e); return DeleteControl.defaultDelete(); } - if (cause instanceof FlinkJobTerminatedWithoutCancellationException) { + if (findThrowable(e, FlinkJobTerminatedWithoutCancellationException.class) + .isPresent()) { LOG.error("Job {} already terminated without cancellation.", jobID, e); return DeleteControl.defaultDelete(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 7936e4ed..6e461e4d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -67,9 +67,12 @@ import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders; +import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + import io.fabric8.kubernetes.api.model.DeletionPropagation; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; @@ -92,6 +95,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; @@ -447,7 +451,16 @@ public class TestingFlinkService extends AbstractFlinkService { } if (isFlinkJobNotFound) { - throw new FlinkJobNotFoundException(jobID); + // Throw different exceptions randomly, see + // https://github.com/apache/flink-kubernetes-operator/pull/818 + if (new Random().nextBoolean()) { + throw new RestClientException( + "Job could not be found.", + new FlinkJobNotFoundException(jobID), + HttpResponseStatus.NOT_FOUND); + } else { + throw new FlinkJobNotFoundException(jobID); + } } var jobOpt = jobs.stream().filter(js -> js.f1.getJobId().equals(jobID)).findAny();