This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new b7fc9b71 [FLINK-29194] Add logging for ResourceListener b7fc9b71 is described below commit b7fc9b71458ca72364431635007945816a4e107f Author: Matyas Orhidi <53612764+morh...@users.noreply.github.com> AuthorDate: Wed Sep 7 21:01:56 2022 +0200 [FLINK-29194] Add logging for ResourceListener --- .../kubernetes/operator/listener/AuditUtils.java | 60 ++++++++++++++++++++++ .../operator/listener/ListenerUtils.java | 3 +- .../metrics/lifecycle/ResourceLifecycleState.java | 24 +++++---- .../kubernetes/operator/utils/EventRecorder.java | 58 +++++++++++---------- .../kubernetes/operator/utils/StatusRecorder.java | 2 + .../src/main/resources/log4j2.properties | 3 ++ 6 files changed, 111 insertions(+), 39 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java new file mode 100644 index 00000000..0692dec9 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.listener; + +import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.crd.status.CommonStatus; + +import io.fabric8.kubernetes.api.model.Event; +import lombok.NonNull; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Responsible for logging resource event/status updates. */ +public class AuditUtils { + private static final Logger LOG = LoggerFactory.getLogger(AuditUtils.class); + + public static <R extends AbstractFlinkResource<?, S>, S extends CommonStatus<?>> + void logContext(FlinkResourceListener.StatusUpdateContext<R, S> ctx) { + LOG.info(format(ctx.getNewStatus())); + } + + public static <R extends AbstractFlinkResource<?, ?>> void logContext( + FlinkResourceListener.ResourceEventContext<R> ctx) { + LOG.info(format(ctx.getEvent())); + } + + private static String format(@NonNull CommonStatus<?> status) { + return String.format( + ">>> Status | %-7s | %-15s | %s ", + StringUtils.isEmpty(status.getError()) ? "Info" : "Error", + status.getLifecycleState(), + StringUtils.isEmpty(status.getError()) + ? status.getLifecycleState().getDescription() + : status.getError()); + } + + private static String format(@NonNull Event event) { + return String.format( + ">>> Event | %-7s | %-15s | %s", + event.getType().equals("Normal") ? "Info" : event.getType(), + event.getReason().toUpperCase(), + event.getMessage()); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java index f5ce71d4..843cdf26 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java @@ -26,7 +26,6 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +44,7 @@ import java.util.regex.Pattern; /** Flink resource listener utilities. */ public class ListenerUtils { - private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(ListenerUtils.class); private static final String PREFIX = "kubernetes.operator.plugins.listeners."; private static final String SUFFIX = ".class"; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java index 19c0da36..324f5346 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java @@ -17,25 +17,31 @@ package org.apache.flink.kubernetes.operator.metrics.lifecycle; +import lombok.Getter; + import java.util.Collections; import java.util.EnumSet; import java.util.Set; /** Enum encapsulating the lifecycle state of a Flink resource. */ public enum ResourceLifecycleState { - CREATED(false), - SUSPENDED(true), - UPGRADING(false), - DEPLOYED(false), - STABLE(true), - ROLLING_BACK(false), - ROLLED_BACK(true), - FAILED(true); + CREATED(false, "The resource was created in Kubernetes but not yet handled by the operator"), + SUSPENDED(true, "The resource (job) has been suspended"), + UPGRADING(false, "The resource is being upgraded"), + DEPLOYED( + false, + "The resource is deployed/submitted to Kubernetes, but it’s not yet considered to be stable and might be rolled back in the future"), + STABLE(true, "The resource deployment is considered to be stable and won’t be rolled back"), + ROLLING_BACK(false, "The resource is being rolled back to the last stable spec"), + ROLLED_BACK(true, "The resource is deployed with the last stable spec"), + FAILED(true, "The job terminally failed"); private final boolean terminal; + @Getter private final String description; - ResourceLifecycleState(boolean terminal) { + ResourceLifecycleState(boolean terminal, String description) { this.terminal = terminal; + this.description = description; } public Set<ResourceLifecycleState> getClearedStatesAfterTransition( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index 9bc55762..a3baeb9a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.listener.AuditUtils; import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener; import io.fabric8.kubernetes.api.model.Event; @@ -69,34 +70,35 @@ public class EventRecorder { KubernetesClient client, Collection<FlinkResourceListener> listeners) { BiConsumer<AbstractFlinkResource<?, ?>, Event> biConsumer = - (resource, event) -> - listeners.forEach( - listener -> { - var ctx = - new FlinkResourceListener.ResourceEventContext() { - @Override - public Event getEvent() { - return event; - } - - @Override - public AbstractFlinkResource<?, ?> - getFlinkResource() { - return resource; - } - - @Override - public KubernetesClient getKubernetesClient() { - return client; - } - }; - - if (resource instanceof FlinkDeployment) { - listener.onDeploymentEvent(ctx); - } else { - listener.onSessionJobEvent(ctx); - } - }); + (resource, event) -> { + var ctx = + new FlinkResourceListener.ResourceEventContext() { + @Override + public Event getEvent() { + return event; + } + + @Override + public AbstractFlinkResource<?, ?> getFlinkResource() { + return resource; + } + + @Override + public KubernetesClient getKubernetesClient() { + return client; + } + }; + listeners.forEach( + listener -> { + if (resource instanceof FlinkDeployment) { + listener.onDeploymentEvent(ctx); + } else { + listener.onSessionJobEvent(ctx); + } + }); + AuditUtils.logContext(ctx); + }; + return new EventRecorder(client, biConsumer); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java index c042e0de..bd196dd5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.status.CommonStatus; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus; +import org.apache.flink.kubernetes.operator.listener.AuditUtils; import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener; import org.apache.flink.kubernetes.operator.metrics.MetricManager; @@ -196,6 +197,7 @@ public class StatusRecorder< listener.onSessionJobStatusUpdate(ctx); } }); + AuditUtils.logContext(ctx); }; return new StatusRecorder<>(kubernetesClient, metricManager, consumer); diff --git a/flink-kubernetes-operator/src/main/resources/log4j2.properties b/flink-kubernetes-operator/src/main/resources/log4j2.properties index f7f24f44..15b1eef7 100644 --- a/flink-kubernetes-operator/src/main/resources/log4j2.properties +++ b/flink-kubernetes-operator/src/main/resources/log4j2.properties @@ -27,3 +27,6 @@ appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %hi logger.conf.name = org.apache.flink.configuration.GlobalConfiguration logger.conf.level = WARN + +logger.event.name = org.apache.flink.kubernetes.operator.listener.AuditUtils +logger.event.level = INFO