This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 84b08d00 [FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener 84b08d00 is described below commit 84b08d00f67552dbd4bd97d49ab6d39f1cd87f7d Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Mon Sep 12 11:03:50 2022 +0200 [FLINK-29251] Send CREATED status and Cancel event via FlinkResourceListener --- .../controller/FlinkDeploymentController.java | 9 +++++- .../controller/FlinkSessionJobController.java | 9 +++++- .../operator/observer/JobStatusObserver.java | 2 +- .../reconciler/deployment/SessionReconciler.java | 2 +- .../kubernetes/operator/utils/EventRecorder.java | 3 +- .../kubernetes/operator/utils/StatusRecorder.java | 4 +++ .../controller/FlinkDeploymentControllerTest.java | 16 +++++------ .../listener/FlinkResourceListenerTest.java | 32 ++++++++++++---------- 8 files changed, 50 insertions(+), 27 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index a0b0a161..0026ac54 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -84,7 +84,14 @@ public class FlinkDeploymentController @Override public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) { - LOG.info("Deleting FlinkDeployment"); + String msg = "Cleaning up " + FlinkDeployment.class.getSimpleName(); + LOG.info(msg); + eventRecorder.triggerEvent( + flinkApp, + EventRecorder.Type.Normal, + EventRecorder.Reason.Cleanup, + EventRecorder.Component.Operator, + msg); statusRecorder.updateStatusFromCache(flinkApp); try { observerFactory.getOrCreate(flinkApp).observe(flinkApp, context); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index d037571e..61dc9512 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -107,7 +107,14 @@ public class FlinkSessionJobController @Override public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) { - LOG.info("Deleting FlinkSessionJob"); + String msg = "Cleaning up " + FlinkSessionJob.class.getSimpleName(); + LOG.info(msg); + eventRecorder.triggerEvent( + sessionJob, + EventRecorder.Type.Normal, + EventRecorder.Reason.Cleanup, + EventRecorder.Component.Operator, + msg); statusRecorder.removeCachedStatus(sessionJob); return reconciler.cleanup(sessionJob, context); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index 2c6b4bc6..51cf72ad 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -147,7 +147,7 @@ public abstract class JobStatusObserver<CTX> { eventRecorder.triggerEvent( resource, EventRecorder.Type.Normal, - EventRecorder.Reason.StatusChanged, + EventRecorder.Reason.JobStatusChanged, EventRecorder.Component.Job, message); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 1131a5e9..43a62e38 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -187,7 +187,7 @@ public class SessionReconciler if (eventRecorder.triggerEvent( deployment, EventRecorder.Type.Warning, - EventRecorder.Reason.Cleanup, + EventRecorder.Reason.CleanupFailed, EventRecorder.Component.Operator, error)) { LOG.warn(error); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index a3baeb9a..c29879f8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -121,9 +121,10 @@ public class EventRecorder { SpecChanged, Rollback, Submit, - StatusChanged, + JobStatusChanged, SavepointError, Cleanup, + CleanupFailed, Missing, ValidationError } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java index bd196dd5..c21fe0c8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java @@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.listener.AuditUtils; import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener; import org.apache.flink.kubernetes.operator.metrics.MetricManager; +import org.apache.flink.kubernetes.operator.metrics.lifecycle.ResourceLifecycleState; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -140,6 +141,9 @@ public class StatusRecorder< } else { // Initialize cache with current status copy statusCache.put(key, objectMapper.convertValue(resource.getStatus(), ObjectNode.class)); + if (ResourceLifecycleState.CREATED.equals(resource.getStatus().getLifecycleState())) { + statusUpdateListener.accept(resource, resource.getStatus()); + } } metricManager.onUpdate(resource); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 298da8fd..e0fafc47 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -117,7 +117,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RECONCILING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(2, testController.getInternalStatusUpdateCount()); + assertEquals(3, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -140,7 +140,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RECONCILING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(3, testController.getInternalStatusUpdateCount()); + assertEquals(4, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -154,7 +154,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(4, testController.getInternalStatusUpdateCount()); + assertEquals(5, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -169,7 +169,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(4, testController.getInternalStatusUpdateCount()); + assertEquals(5, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -195,7 +195,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(5, testController.getInternalStatusUpdateCount()); + assertEquals(6, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); reconciliationStatus = appCluster.getStatus().getReconciliationStatus(); @@ -482,7 +482,7 @@ public class FlinkDeploymentControllerTest { assertEquals(1, testController.events().size()); assertEquals( - EventRecorder.Reason.StatusChanged, + EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(testController.events().poll().getReason())); // Upgrade job @@ -530,7 +530,7 @@ public class FlinkDeploymentControllerTest { testController.reconcile(appCluster, context); assertEquals(1, testController.events().size()); assertEquals( - EventRecorder.Reason.StatusChanged, + EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(testController.events().poll().getReason())); // Suspend job @@ -592,7 +592,7 @@ public class FlinkDeploymentControllerTest { .collect(Collectors.toList()); assertEquals(1, statusEvents.size()); assertEquals( - EventRecorder.Reason.StatusChanged, + EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(statusEvents.get(0).getReason())); assertEquals( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java index 9d18aa6f..ebfc865f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java @@ -58,36 +58,40 @@ public class FlinkResourceListenerTest { var eventRecorder = EventRecorder.create(kubernetesClient, listeners); var deployment = TestUtils.buildApplicationCluster(); - statusRecorder.updateStatusFromCache(deployment); - statusRecorder.patchAndCacheStatus(deployment); assertTrue(listener1.updates.isEmpty()); assertTrue(listener2.updates.isEmpty()); assertTrue(listener1.events.isEmpty()); assertTrue(listener2.events.isEmpty()); - deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR); - - statusRecorder.patchAndCacheStatus(deployment); + statusRecorder.updateStatusFromCache(deployment); + assertEquals(1, listener1.updates.size()); + statusRecorder.updateStatusFromCache(deployment); assertEquals(1, listener1.updates.size()); assertEquals(deployment, listener1.updates.get(0).getFlinkResource()); - assertEquals(1, listener2.updates.size()); - assertEquals(deployment, listener2.updates.get(0).getFlinkResource()); - assertEquals( - listener1.updates.get(0).getTimestamp(), listener2.updates.get(0).getTimestamp()); + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR); + statusRecorder.patchAndCacheStatus(deployment); + assertEquals(2, listener1.updates.size()); + assertEquals(deployment, listener1.updates.get(1).getFlinkResource()); deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); statusRecorder.patchAndCacheStatus(deployment); + assertEquals(3, listener1.updates.size()); + assertEquals(deployment, listener1.updates.get(2).getFlinkResource()); - assertEquals(2, listener1.updates.size()); - assertEquals(deployment, listener1.updates.get(0).getFlinkResource()); - assertEquals(2, listener2.updates.size()); - assertEquals(deployment, listener2.updates.get(0).getFlinkResource()); + for (int i = 0; i < listener1.updates.size(); i++) { + assertEquals( + listener1.updates.get(i).getTimestamp(), + listener2.updates.get(i).getTimestamp()); + assertEquals( + listener1.updates.get(i).getFlinkResource(), + listener2.updates.get(i).getFlinkResource()); + } var updateContext = (FlinkResourceListener.StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus>) - listener1.updates.get(1); + listener1.updates.get(2); assertEquals( JobManagerDeploymentStatus.ERROR, updateContext.getPreviousStatus().getJobManagerDeploymentStatus());