This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 84b08d00 [FLINK-29251] Send CREATED status and Cancel event via 
FlinkResourceListener
84b08d00 is described below

commit 84b08d00f67552dbd4bd97d49ab6d39f1cd87f7d
Author: Matyas Orhidi <matyas_orh...@apple.com>
AuthorDate: Mon Sep 12 11:03:50 2022 +0200

    [FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener
---
 .../controller/FlinkDeploymentController.java      |  9 +++++-
 .../controller/FlinkSessionJobController.java      |  9 +++++-
 .../operator/observer/JobStatusObserver.java       |  2 +-
 .../reconciler/deployment/SessionReconciler.java   |  2 +-
 .../kubernetes/operator/utils/EventRecorder.java   |  3 +-
 .../kubernetes/operator/utils/StatusRecorder.java  |  4 +++
 .../controller/FlinkDeploymentControllerTest.java  | 16 +++++------
 .../listener/FlinkResourceListenerTest.java        | 32 ++++++++++++----------
 8 files changed, 50 insertions(+), 27 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index a0b0a161..0026ac54 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -84,7 +84,14 @@ public class FlinkDeploymentController
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        LOG.info("Deleting FlinkDeployment");
+        String msg = "Cleaning up " + FlinkDeployment.class.getSimpleName();
+        LOG.info(msg);
+        eventRecorder.triggerEvent(
+                flinkApp,
+                EventRecorder.Type.Normal,
+                EventRecorder.Reason.Cleanup,
+                EventRecorder.Component.Operator,
+                msg);
         statusRecorder.updateStatusFromCache(flinkApp);
         try {
             observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index d037571e..61dc9512 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -107,7 +107,14 @@ public class FlinkSessionJobController
 
     @Override
     public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
-        LOG.info("Deleting FlinkSessionJob");
+        String msg = "Cleaning up " + FlinkSessionJob.class.getSimpleName();
+        LOG.info(msg);
+        eventRecorder.triggerEvent(
+                sessionJob,
+                EventRecorder.Type.Normal,
+                EventRecorder.Reason.Cleanup,
+                EventRecorder.Component.Operator,
+                msg);
         statusRecorder.removeCachedStatus(sessionJob);
         return reconciler.cleanup(sessionJob, context);
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 2c6b4bc6..51cf72ad 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -147,7 +147,7 @@ public abstract class JobStatusObserver<CTX> {
             eventRecorder.triggerEvent(
                     resource,
                     EventRecorder.Type.Normal,
-                    EventRecorder.Reason.StatusChanged,
+                    EventRecorder.Reason.JobStatusChanged,
                     EventRecorder.Component.Job,
                     message);
         }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 1131a5e9..43a62e38 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -187,7 +187,7 @@ public class SessionReconciler
             if (eventRecorder.triggerEvent(
                     deployment,
                     EventRecorder.Type.Warning,
-                    EventRecorder.Reason.Cleanup,
+                    EventRecorder.Reason.CleanupFailed,
                     EventRecorder.Component.Operator,
                     error)) {
                 LOG.warn(error);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index a3baeb9a..c29879f8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -121,9 +121,10 @@ public class EventRecorder {
         SpecChanged,
         Rollback,
         Submit,
-        StatusChanged,
+        JobStatusChanged,
         SavepointError,
         Cleanup,
+        CleanupFailed,
         Missing,
         ValidationError
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index bd196dd5..c21fe0c8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.listener.AuditUtils;
 import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import 
org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -140,6 +141,9 @@ public class StatusRecorder<
         } else {
             // Initialize cache with current status copy
             statusCache.put(key, 
objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
+            if 
(ResourceLifecycleState.CREATED.equals(resource.getStatus().getLifecycleState()))
 {
+                statusUpdateListener.accept(resource, resource.getStatus());
+            }
         }
         metricManager.onUpdate(resource);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 298da8fd..e0fafc47 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -117,7 +117,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(2, testController.getInternalStatusUpdateCount());
+        assertEquals(3, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -140,7 +140,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(3, testController.getInternalStatusUpdateCount());
+        assertEquals(4, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -154,7 +154,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(4, testController.getInternalStatusUpdateCount());
+        assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -169,7 +169,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(4, testController.getInternalStatusUpdateCount());
+        assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -195,7 +195,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(5, testController.getInternalStatusUpdateCount());
+        assertEquals(6, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
 
         reconciliationStatus = 
appCluster.getStatus().getReconciliationStatus();
@@ -482,7 +482,7 @@ public class FlinkDeploymentControllerTest {
 
         assertEquals(1, testController.events().size());
         assertEquals(
-                EventRecorder.Reason.StatusChanged,
+                EventRecorder.Reason.JobStatusChanged,
                 
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
 
         // Upgrade job
@@ -530,7 +530,7 @@ public class FlinkDeploymentControllerTest {
         testController.reconcile(appCluster, context);
         assertEquals(1, testController.events().size());
         assertEquals(
-                EventRecorder.Reason.StatusChanged,
+                EventRecorder.Reason.JobStatusChanged,
                 
EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
 
         // Suspend job
@@ -592,7 +592,7 @@ public class FlinkDeploymentControllerTest {
                         .collect(Collectors.toList());
         assertEquals(1, statusEvents.size());
         assertEquals(
-                EventRecorder.Reason.StatusChanged,
+                EventRecorder.Reason.JobStatusChanged,
                 EventRecorder.Reason.valueOf(statusEvents.get(0).getReason()));
 
         assertEquals(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index 9d18aa6f..ebfc865f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -58,36 +58,40 @@ public class FlinkResourceListenerTest {
         var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
 
         var deployment = TestUtils.buildApplicationCluster();
-        statusRecorder.updateStatusFromCache(deployment);
 
-        statusRecorder.patchAndCacheStatus(deployment);
         assertTrue(listener1.updates.isEmpty());
         assertTrue(listener2.updates.isEmpty());
         assertTrue(listener1.events.isEmpty());
         assertTrue(listener2.events.isEmpty());
 
-        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
-
-        statusRecorder.patchAndCacheStatus(deployment);
+        statusRecorder.updateStatusFromCache(deployment);
+        assertEquals(1, listener1.updates.size());
+        statusRecorder.updateStatusFromCache(deployment);
         assertEquals(1, listener1.updates.size());
         assertEquals(deployment, listener1.updates.get(0).getFlinkResource());
 
-        assertEquals(1, listener2.updates.size());
-        assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
-        assertEquals(
-                listener1.updates.get(0).getTimestamp(), 
listener2.updates.get(0).getTimestamp());
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
+        statusRecorder.patchAndCacheStatus(deployment);
+        assertEquals(2, listener1.updates.size());
+        assertEquals(deployment, listener1.updates.get(1).getFlinkResource());
 
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
         statusRecorder.patchAndCacheStatus(deployment);
+        assertEquals(3, listener1.updates.size());
+        assertEquals(deployment, listener1.updates.get(2).getFlinkResource());
 
-        assertEquals(2, listener1.updates.size());
-        assertEquals(deployment, listener1.updates.get(0).getFlinkResource());
-        assertEquals(2, listener2.updates.size());
-        assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
+        for (int i = 0; i < listener1.updates.size(); i++) {
+            assertEquals(
+                    listener1.updates.get(i).getTimestamp(),
+                    listener2.updates.get(i).getTimestamp());
+            assertEquals(
+                    listener1.updates.get(i).getFlinkResource(),
+                    listener2.updates.get(i).getFlinkResource());
+        }
 
         var updateContext =
                 (FlinkResourceListener.StatusUpdateContext<FlinkDeployment, 
FlinkDeploymentStatus>)
-                        listener1.updates.get(1);
+                        listener1.updates.get(2);
         assertEquals(
                 JobManagerDeploymentStatus.ERROR,
                 
updateContext.getPreviousStatus().getJobManagerDeploymentStatus());

Reply via email to