This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new cda493e5 [FLINK-37455] Create error Event when job goes into FAILED 
state
cda493e5 is described below

commit cda493e52e97c09886a466cb9ecad8b679bedcb0
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri Mar 28 08:46:14 2025 +0100

    [FLINK-37455] Create error Event when job goes into FAILED state
---
 .../controller/FlinkDeploymentController.java      | 45 ++++++----------------
 .../controller/FlinkSessionJobController.java      | 18 +++++----
 .../operator/observer/JobStatusObserver.java       | 18 ++++++---
 .../kubernetes/operator/utils/EventRecorder.java   |  3 +-
 .../kubernetes/operator/utils/ExceptionUtils.java  |  9 +++--
 .../kubernetes/operator/TestingFlinkService.java   | 22 +++++++++++
 .../controller/FlinkDeploymentControllerTest.java  |  6 +--
 .../controller/FlinkSessionJobControllerTest.java  |  4 +-
 .../operator/observer/JobStatusObserverTest.java   | 33 ++++++++++++++++
 .../operator/utils/ExceptionUtilsTest.java         | 21 ++++++++--
 10 files changed, 119 insertions(+), 60 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index bae03cba..51235b8e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -155,17 +155,15 @@ public class FlinkDeploymentController
             statusRecorder.patchAndCacheStatus(flinkApp, 
ctx.getKubernetesClient());
             reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);
         } catch (UpgradeFailureException ufe) {
-            handleUpgradeFailure(ctx, ufe);
+            ReconciliationUtils.updateForReconciliationError(ctx, ufe);
+            triggerErrorEvent(ctx, ufe, ufe.getReason());
         } catch (DeploymentFailedException dfe) {
-            handleDeploymentFailed(ctx, dfe);
+            
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
+            
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
+            ReconciliationUtils.updateForReconciliationError(ctx, dfe);
+            triggerErrorEvent(ctx, dfe, dfe.getReason());
         } catch (Exception e) {
-            eventRecorder.triggerEvent(
-                    flinkApp,
-                    EventRecorder.Type.Warning,
-                    "ClusterDeploymentException",
-                    ExceptionUtils.getExceptionMessage(e),
-                    EventRecorder.Component.JobManagerDeployment,
-                    josdkContext.getClient());
+            triggerErrorEvent(ctx, e, EventRecorder.Reason.Error.name());
             throw new ReconciliationException(e);
         }
 
@@ -175,32 +173,13 @@ public class FlinkDeploymentController
                 ctx.getOperatorConfig(), flinkApp, previousDeployment, true);
     }
 
-    private void handleDeploymentFailed(
-            FlinkResourceContext<FlinkDeployment> ctx, 
DeploymentFailedException dfe) {
-        var flinkApp = ctx.getResource();
-        LOG.error("Flink Deployment failed", dfe);
-        
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
-        flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
-        ReconciliationUtils.updateForReconciliationError(ctx, dfe);
+    private void triggerErrorEvent(
+            FlinkResourceContext<FlinkDeployment> ctx, Exception e, String 
reason) {
         eventRecorder.triggerEvent(
-                flinkApp,
-                EventRecorder.Type.Warning,
-                dfe.getReason(),
-                dfe.getMessage(),
-                EventRecorder.Component.JobManagerDeployment,
-                ctx.getKubernetesClient());
-    }
-
-    private void handleUpgradeFailure(
-            FlinkResourceContext<FlinkDeployment> ctx, UpgradeFailureException 
ufe) {
-        LOG.error("Error while upgrading Flink Deployment", ufe);
-        var flinkApp = ctx.getResource();
-        ReconciliationUtils.updateForReconciliationError(ctx, ufe);
-        eventRecorder.triggerEvent(
-                flinkApp,
+                ctx.getResource(),
                 EventRecorder.Type.Warning,
-                ufe.getReason(),
-                ufe.getMessage(),
+                reason,
+                ExceptionUtils.getExceptionMessage(e),
                 EventRecorder.Component.JobManagerDeployment,
                 ctx.getKubernetesClient());
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 4838dea8..7454864f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -121,13 +121,7 @@ public class FlinkSessionJobController
             statusRecorder.patchAndCacheStatus(flinkSessionJob, 
ctx.getKubernetesClient());
             reconciler.reconcile(ctx);
         } catch (Exception e) {
-            eventRecorder.triggerEvent(
-                    flinkSessionJob,
-                    EventRecorder.Type.Warning,
-                    "SessionJobException",
-                    ExceptionUtils.getExceptionMessage(e),
-                    EventRecorder.Component.Job,
-                    josdkContext.getClient());
+            triggerErrorEvent(ctx, e);
             throw new ReconciliationException(e);
         }
         statusRecorder.patchAndCacheStatus(flinkSessionJob, 
ctx.getKubernetesClient());
@@ -167,6 +161,16 @@ public class FlinkSessionJobController
         return deleteControl;
     }
 
+    private void triggerErrorEvent(FlinkResourceContext<?> ctx, Exception e) {
+        eventRecorder.triggerEvent(
+                ctx.getResource(),
+                EventRecorder.Type.Warning,
+                EventRecorder.Reason.Error.name(),
+                ExceptionUtils.getExceptionMessage(e),
+                EventRecorder.Component.Job,
+                ctx.getKubernetesClient());
+    }
+
     @Override
     public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
             FlinkSessionJob sessionJob, Context<FlinkSessionJob> context, 
Exception e) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index c94c1231..444d0a0a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import org.slf4j.Logger;
@@ -182,7 +183,7 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
                 markSuspended(resource);
             }
 
-            setErrorIfPresent(ctx, clusterJobStatus);
+            recordJobErrorIfPresent(ctx, clusterJobStatus);
             eventRecorder.triggerEvent(
                     resource,
                     EventRecorder.Type.Normal,
@@ -203,7 +204,8 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
                 });
     }
 
-    private void setErrorIfPresent(FlinkResourceContext<R> ctx, 
JobStatusMessage clusterJobStatus) {
+    private void recordJobErrorIfPresent(
+            FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
         if (clusterJobStatus.getJobState() == JobStatus.FAILED) {
             try {
                 var result =
@@ -215,10 +217,14 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
                                 t -> {
                                     updateFlinkResourceException(
                                             t, ctx.getResource(), 
ctx.getOperatorConfig());
-                                    LOG.error(
-                                            "Job {} failed with error: {}",
-                                            clusterJobStatus.getJobId(),
-                                            t.getFullStringifiedStackTrace());
+
+                                    eventRecorder.triggerEvent(
+                                            ctx.getResource(),
+                                            EventRecorder.Type.Warning,
+                                            EventRecorder.Reason.Error,
+                                            EventRecorder.Component.Job,
+                                            
ExceptionUtils.getExceptionMessage(t),
+                                            ctx.getKubernetesClient());
                                 });
             } catch (Exception e) {
                 LOG.warn("Failed to request the job result", e);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index 4af3f190..1989de0e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -314,6 +314,7 @@ public class EventRecorder {
         Scaling,
         UnsupportedFlinkVersion,
         SnapshotError,
-        SnapshotAbandoned
+        SnapshotAbandoned,
+        Error
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java
index ad7bd6be..002c43b7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java
@@ -75,7 +75,7 @@ public class ExceptionUtils {
      *     &rarr; cause3"
      */
     public static String getExceptionMessage(Throwable throwable) {
-        return getExceptionMessage(throwable, 0);
+        return getExceptionMessage(throwable, 1);
     }
 
     /**
@@ -93,11 +93,12 @@ public class ExceptionUtils {
         }
 
         if (throwable instanceof SerializedThrowable) {
+            var serialized = ((SerializedThrowable) throwable);
             var deserialized =
-                    ((SerializedThrowable) throwable)
-                            
.deserializeError(Thread.currentThread().getContextClassLoader());
+                    
serialized.deserializeError(Thread.currentThread().getContextClassLoader());
             if (deserialized == throwable) {
-                return "Unknown Error (SerializedThrowable)";
+                var msg = serialized.getMessage();
+                return msg != null ? msg : 
serialized.getOriginalErrorClassName();
             } else {
                 return getExceptionMessage(deserialized, level);
             }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 3dfc34a9..fa893e7c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
 import org.apache.flink.kubernetes.operator.service.SuspendMode;
 import 
org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -141,6 +142,7 @@ public class TestingFlinkService extends 
AbstractFlinkService {
     @Getter private final Map<String, Boolean> checkpointTriggers = new 
HashMap<>();
     private final Map<Long, String> checkpointStats = new HashMap<>();
     @Setter private boolean throwCheckpointingDisabledError = false;
+    @Setter private Throwable jobFailedErr;
 
     @Getter private int desiredReplicas = 0;
     @Getter private int cancelJobCallCount = 0;
@@ -301,9 +303,29 @@ public class TestingFlinkService extends 
AbstractFlinkService {
         if (!isPortReady) {
             throw new TimeoutException("JM port is unavailable");
         }
+
+        if (jobFailedErr != null) {
+            return Optional.of(new JobStatusMessage(jobID, "n", 
JobStatus.FAILED, 0));
+        }
+
         return super.getJobStatus(conf, jobID);
     }
 
+    @Override
+    public JobResult requestJobResult(Configuration conf, JobID jobID) throws 
Exception {
+        if (jobFailedErr != null) {
+            return new JobResult.Builder()
+                    .jobId(jobID)
+                    .serializedThrowable(new SerializedThrowable(jobFailedErr))
+                    .netRuntime(1)
+                    .accumulatorResults(new HashMap<>())
+                    .applicationStatus(ApplicationStatus.FAILED)
+                    .build();
+        }
+
+        return super.requestJobResult(conf, jobID);
+    }
+
     public List<Tuple3<String, JobStatusMessage, Configuration>> listJobs() {
         return jobs;
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 18273c63..8c3bd33f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -981,7 +981,7 @@ public class FlinkDeploymentControllerTest {
         var event = testController.flinkResourceEvents().remove();
         assertEquals("Submit", event.getReason());
         event = testController.flinkResourceEvents().remove();
-        assertEquals("ClusterDeploymentException", event.getReason());
+        assertEquals("Error", event.getReason());
         assertEquals("Deployment failure", event.getMessage());
     }
 
@@ -1006,7 +1006,7 @@ public class FlinkDeploymentControllerTest {
         var event = testController.flinkResourceEvents().remove();
         assertEquals("Submit", event.getReason());
         event = testController.flinkResourceEvents().remove();
-        assertEquals("ClusterDeploymentException", event.getReason());
+        assertEquals("Error", event.getReason());
         assertEquals(
                 "Deployment Failure -> IllegalStateException -> actual failure 
reason",
                 event.getMessage());
@@ -1112,7 +1112,7 @@ public class FlinkDeploymentControllerTest {
         var event = testController.flinkResourceEvents().remove();
         assertEquals("Submit", event.getReason());
         event = testController.flinkResourceEvents().remove();
-        assertEquals("ClusterDeploymentException", event.getReason());
+        assertEquals("Error", event.getReason());
         assertEquals(
                 "Deployment Failure -> IllegalStateException -> actual failure 
reason",
                 event.getMessage());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index b7ad6f13..f0489cd2 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -103,7 +103,7 @@ class FlinkSessionJobControllerTest {
 
         var event = testController.events().remove();
         Assertions.assertEquals(EventRecorder.Type.Warning.toString(), 
event.getType());
-        Assertions.assertEquals("SessionJobException", event.getReason());
+        Assertions.assertEquals("Error", event.getReason());
 
         testController.cleanup(sessionJob, context);
     }
@@ -635,7 +635,7 @@ class FlinkSessionJobControllerTest {
         var event = testController.events().remove();
         assertEquals("Submit", event.getReason());
         event = testController.events().remove();
-        assertEquals("SessionJobException", event.getReason());
+        assertEquals("Error", event.getReason());
         assertEquals(
                 "Deployment Failure -> IllegalStateException -> actual failure 
reason",
                 event.getMessage());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index 512e9f7c..aee49e3c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -29,10 +29,12 @@ import 
org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.util.SerializedThrowable;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import lombok.Getter;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -42,6 +44,7 @@ import java.util.ArrayList;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for the {@link JobStatusObserver}. */
 @EnableKubernetesMockClient(crud = true)
@@ -114,6 +117,36 @@ public class JobStatusObserverTest extends 
OperatorTestBase {
                         .getState());
     }
 
+    @Test
+    void testFailed() throws Exception {
+        var observer = new JobStatusObserver<>(eventRecorder);
+        var deployment = initDeployment();
+        var status = deployment.getStatus();
+        var jobStatus = status.getJobStatus();
+        jobStatus.setState(JobStatus.RUNNING);
+        FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = 
getResourceContext(deployment);
+        flinkService.submitApplicationCluster(
+                deployment.getSpec().getJob(), 
ctx.getDeployConfig(deployment.getSpec()), false);
+
+        // Mark failed
+        flinkService.setJobFailedErr(
+                new Exception("job err", new SerializedThrowable(new 
Exception("root"))));
+        observer.observe(ctx);
+
+        // First event should be job error reported
+        var jobErrorEvent = flinkResourceEventCollector.events.poll();
+        assertEquals(EventRecorder.Reason.Error.name(), 
jobErrorEvent.getReason());
+        assertEquals("job err -> root", jobErrorEvent.getMessage());
+
+        // Make sure job status still reported
+        assertEquals(
+                EventRecorder.Reason.JobStatusChanged.name(),
+                flinkResourceEventCollector.events.poll().getReason());
+
+        observer.observe(ctx);
+        assertTrue(flinkResourceEventCollector.events.isEmpty());
+    }
+
     private static Stream<Arguments> cancellingArgs() {
         var args = new ArrayList<Arguments>();
         for (var status : JobStatus.values()) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java
index 1bd8873d..144ec318 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java
@@ -47,17 +47,30 @@ public class ExceptionUtilsTest {
         var ex2 = new RuntimeException("Cause 2", new 
SerializedThrowable(ex3));
         var ex = new RuntimeException("Cause 1", ex2);
         assertThat(ExceptionUtils.getExceptionMessage(ex))
-                .isEqualTo("Cause 1 -> Cause 2 -> Cause 3 -> Cause 4");
+                .isEqualTo("Cause 1 -> Cause 2 -> Cause 3");
     }
 
     @Test
     void testSerializedThrowableError() {
-        var serializedException = new SerializedThrowable(new 
NonSerializableException());
-        assertThat(ExceptionUtils.getExceptionMessage(serializedException))
-                .isEqualTo("Unknown Error (SerializedThrowable)");
+        assertThat(
+                        ExceptionUtils.getExceptionMessage(
+                                new SerializedThrowable(new 
NonSerializableException("Message"))))
+                .isEqualTo(String.format("%s: Message", 
NonSerializableException.class.getName()));
+
+        assertThat(
+                        ExceptionUtils.getExceptionMessage(
+                                new SerializedThrowable(new 
NonSerializableException())))
+                .isEqualTo(NonSerializableException.class.getName());
     }
 
     private static class NonSerializableException extends Exception {
+
+        public NonSerializableException(String message) {
+            super(message);
+        }
+
+        public NonSerializableException() {}
+
         private void writeObject(java.io.ObjectOutputStream stream) throws 
IOException {
             throw new IOException();
         }

Reply via email to