This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b7fc9b71 [FLINK-29194] Add logging for ResourceListener
b7fc9b71 is described below

commit b7fc9b71458ca72364431635007945816a4e107f
Author: Matyas Orhidi <53612764+morh...@users.noreply.github.com>
AuthorDate: Wed Sep 7 21:01:56 2022 +0200

    [FLINK-29194] Add logging for ResourceListener
---
 .../kubernetes/operator/listener/AuditUtils.java   | 60 ++++++++++++++++++++++
 .../operator/listener/ListenerUtils.java           |  3 +-
 .../metrics/lifecycle/ResourceLifecycleState.java  | 24 +++++----
 .../kubernetes/operator/utils/EventRecorder.java   | 58 +++++++++++----------
 .../kubernetes/operator/utils/StatusRecorder.java  |  2 +
 .../src/main/resources/log4j2.properties           |  3 ++
 6 files changed, 111 insertions(+), 39 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
new file mode 100644
index 00000000..0692dec9
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+
+import io.fabric8.kubernetes.api.model.Event;
+import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Responsible for logging resource event/status updates. */
+public class AuditUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AuditUtils.class);
+
+    public static <R extends AbstractFlinkResource<?, S>, S extends 
CommonStatus<?>>
+            void logContext(FlinkResourceListener.StatusUpdateContext<R, S> 
ctx) {
+        LOG.info(format(ctx.getNewStatus()));
+    }
+
+    public static <R extends AbstractFlinkResource<?, ?>> void logContext(
+            FlinkResourceListener.ResourceEventContext<R> ctx) {
+        LOG.info(format(ctx.getEvent()));
+    }
+
+    private static String format(@NonNull CommonStatus<?> status) {
+        return String.format(
+                ">>> Status | %-7s | %-15s | %s ",
+                StringUtils.isEmpty(status.getError()) ? "Info" : "Error",
+                status.getLifecycleState(),
+                StringUtils.isEmpty(status.getError())
+                        ? status.getLifecycleState().getDescription()
+                        : status.getError());
+    }
+
+    private static String format(@NonNull Event event) {
+        return String.format(
+                ">>> Event  | %-7s | %-15s | %s",
+                event.getType().equals("Normal") ? "Info" : event.getType(),
+                event.getReason().toUpperCase(),
+                event.getMessage());
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
index f5ce71d4..843cdf26 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +44,7 @@ import java.util.regex.Pattern;
 /** Flink resource listener utilities. */
 public class ListenerUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkUtils.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ListenerUtils.class);
 
     private static final String PREFIX = 
"kubernetes.operator.plugins.listeners.";
     private static final String SUFFIX = ".class";
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
index 19c0da36..324f5346 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
@@ -17,25 +17,31 @@
 
 package org.apache.flink.kubernetes.operator.metrics.lifecycle;
 
+import lombok.Getter;
+
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Set;
 
 /** Enum encapsulating the lifecycle state of a Flink resource. */
 public enum ResourceLifecycleState {
-    CREATED(false),
-    SUSPENDED(true),
-    UPGRADING(false),
-    DEPLOYED(false),
-    STABLE(true),
-    ROLLING_BACK(false),
-    ROLLED_BACK(true),
-    FAILED(true);
+    CREATED(false, "The resource was created in Kubernetes but not yet handled 
by the operator"),
+    SUSPENDED(true, "The resource (job) has been suspended"),
+    UPGRADING(false, "The resource is being upgraded"),
+    DEPLOYED(
+            false,
+            "The resource is deployed/submitted to Kubernetes, but it’s not 
yet considered to be stable and might be rolled back in the future"),
+    STABLE(true, "The resource deployment is considered to be stable and won’t 
be rolled back"),
+    ROLLING_BACK(false, "The resource is being rolled back to the last stable 
spec"),
+    ROLLED_BACK(true, "The resource is deployed with the last stable spec"),
+    FAILED(true, "The job terminally failed");
 
     private final boolean terminal;
+    @Getter private final String description;
 
-    ResourceLifecycleState(boolean terminal) {
+    ResourceLifecycleState(boolean terminal, String description) {
         this.terminal = terminal;
+        this.description = description;
     }
 
     public Set<ResourceLifecycleState> getClearedStatesAfterTransition(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index 9bc55762..a3baeb9a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.AuditUtils;
 import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 
 import io.fabric8.kubernetes.api.model.Event;
@@ -69,34 +70,35 @@ public class EventRecorder {
             KubernetesClient client, Collection<FlinkResourceListener> 
listeners) {
 
         BiConsumer<AbstractFlinkResource<?, ?>, Event> biConsumer =
-                (resource, event) ->
-                        listeners.forEach(
-                                listener -> {
-                                    var ctx =
-                                            new 
FlinkResourceListener.ResourceEventContext() {
-                                                @Override
-                                                public Event getEvent() {
-                                                    return event;
-                                                }
-
-                                                @Override
-                                                public 
AbstractFlinkResource<?, ?>
-                                                        getFlinkResource() {
-                                                    return resource;
-                                                }
-
-                                                @Override
-                                                public KubernetesClient 
getKubernetesClient() {
-                                                    return client;
-                                                }
-                                            };
-
-                                    if (resource instanceof FlinkDeployment) {
-                                        listener.onDeploymentEvent(ctx);
-                                    } else {
-                                        listener.onSessionJobEvent(ctx);
-                                    }
-                                });
+                (resource, event) -> {
+                    var ctx =
+                            new FlinkResourceListener.ResourceEventContext() {
+                                @Override
+                                public Event getEvent() {
+                                    return event;
+                                }
+
+                                @Override
+                                public AbstractFlinkResource<?, ?> 
getFlinkResource() {
+                                    return resource;
+                                }
+
+                                @Override
+                                public KubernetesClient getKubernetesClient() {
+                                    return client;
+                                }
+                            };
+                    listeners.forEach(
+                            listener -> {
+                                if (resource instanceof FlinkDeployment) {
+                                    listener.onDeploymentEvent(ctx);
+                                } else {
+                                    listener.onSessionJobEvent(ctx);
+                                }
+                            });
+                    AuditUtils.logContext(ctx);
+                };
+
         return new EventRecorder(client, biConsumer);
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index c042e0de..bd196dd5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.listener.AuditUtils;
 import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 
@@ -196,6 +197,7 @@ public class StatusRecorder<
                                     listener.onSessionJobStatusUpdate(ctx);
                                 }
                             });
+                    AuditUtils.logContext(ctx);
                 };
 
         return new StatusRecorder<>(kubernetesClient, metricManager, consumer);
diff --git a/flink-kubernetes-operator/src/main/resources/log4j2.properties 
b/flink-kubernetes-operator/src/main/resources/log4j2.properties
index f7f24f44..15b1eef7 100644
--- a/flink-kubernetes-operator/src/main/resources/log4j2.properties
+++ b/flink-kubernetes-operator/src/main/resources/log4j2.properties
@@ -27,3 +27,6 @@ appender.console.layout.pattern = %style{%d}{yellow} 
%style{%-30c{1.}}{cyan} %hi
 
 logger.conf.name = org.apache.flink.configuration.GlobalConfiguration
 logger.conf.level = WARN
+
+logger.event.name = org.apache.flink.kubernetes.operator.listener.AuditUtils
+logger.event.level = INFO

Reply via email to