This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d7cae5365d7 [FLINK-31169][k8s] Fix that pod termination may be mis-treated as fatal error. d7cae5365d7 is described below commit d7cae5365d730272a4089988c235c2038eafab53 Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Wed Feb 22 10:13:26 2023 +0800 [FLINK-31169][k8s] Fix that pod termination may be mis-treated as fatal error. This closes #21986 --- .../flink/kubernetes/KubernetesResourceManagerDriver.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java index 259cf1b63f6..508e791e79e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java @@ -228,6 +228,8 @@ public class KubernetesResourceManagerDriver podName); stopPod(taskManagerPod.getName()); } + } else if (t instanceof RetryableException) { + // ignore } else { log.error("Error completing resource request.", t); ExceptionUtils.rethrow(t); @@ -376,7 +378,8 @@ public class KubernetesResourceManagerDriver requestResourceFutures.remove(podName); if (requestResourceFuture != null) { log.warn("Pod {} is terminated before being scheduled.", podName); - requestResourceFuture.completeExceptionally(new FlinkException("Pod is terminated.")); + requestResourceFuture.completeExceptionally( + new RetryableException("Pod is terminated.")); } getResourceEventHandler() @@ -452,4 +455,12 @@ public class KubernetesResourceManagerDriver } } } + + private static class RetryableException extends FlinkException { + private static final long serialVersionUID = 1L; + + RetryableException(String message) { + super(message); + } + } }