(flink) branch master updated: [FLINK-34546] Emit span with failure labels on failure in AdaptiveScheduler. (#24498)

2024-03-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 10bff3dbad1 [FLINK-34546] Emit span with failure labels on failure in 
AdaptiveScheduler. (#24498)
10bff3dbad1 is described below

commit 10bff3dbad103b60915be817a3408820ed09b6cf
Author: Stefan Richter 
AuthorDate: Fri Mar 15 09:36:43 2024 +0100

[FLINK-34546] Emit span with failure labels on failure in 
AdaptiveScheduler. (#24498)
---
 .../failover/ExecutionFailureHandler.java  | 32 ++---
 .../scheduler/adaptive/AdaptiveScheduler.java  | 22 +-
 .../runtime/scheduler/adaptive/Canceling.java  |  4 +-
 .../runtime/scheduler/adaptive/Executing.java  | 10 ++-
 .../flink/runtime/scheduler/adaptive/Failing.java  |  4 +-
 .../adaptive/JobFailureMetricReporter.java | 84 ++
 .../runtime/scheduler/adaptive/Restarting.java |  4 +-
 .../adaptive/StateWithExecutionGraph.java  |  7 +-
 .../scheduler/adaptive/StopWithSavepoint.java  |  9 ++-
 .../failover/ExecutionFailureHandlerTest.java  |  4 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 78 +++-
 .../runtime/scheduler/adaptive/ExecutingTest.java  |  3 +-
 .../adaptive/StateWithExecutionGraphTest.java  |  2 +-
 .../scheduler/adaptive/StopWithSavepointTest.java  | 13 +++-
 14 files changed, 228 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
index 3d36a9e6bff..94130bc2f5f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
@@ -26,13 +26,12 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.failure.FailureEnricherUtils;
+import org.apache.flink.runtime.scheduler.adaptive.JobFailureMetricReporter;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.throwable.ThrowableClassifier;
 import org.apache.flink.runtime.throwable.ThrowableType;
-import org.apache.flink.traces.Span;
-import org.apache.flink.traces.SpanBuilder;
 import org.apache.flink.util.IterableUtils;
 
 import javax.annotation.Nullable;
@@ -70,8 +69,8 @@ public class ExecutionFailureHandler {
 private final Collection failureEnrichers;
 private final ComponentMainThreadExecutor mainThreadExecutor;
 private final MetricGroup metricGroup;
-
 private final boolean reportEventsAsSpans;
+private final JobFailureMetricReporter jobFailureMetricReporter;
 
 /**
  * Creates the handler to deal with task failures.
@@ -105,6 +104,7 @@ public class ExecutionFailureHandler {
 this.globalFailureCtx = globalFailureCtx;
 this.metricGroup = metricGroup;
 this.reportEventsAsSpans = 
jobMasterConfig.get(TraceOptions.REPORT_EVENTS_AS_SPANS);
+this.jobFailureMetricReporter = new 
JobFailureMetricReporter(metricGroup);
 }
 
 /**
@@ -171,35 +171,15 @@ public class ExecutionFailureHandler {
 failureHandlingResult
 .getFailureLabels()
 .thenAcceptAsync(
-labels -> 
reportFailureHandling(failureHandlingResult, labels),
+labels ->
+jobFailureMetricReporter.reportJobFailure(
+failureHandlingResult, labels),
 mainThreadExecutor);
 }
 
 return failureHandlingResult;
 }
 
-private void reportFailureHandling(
-FailureHandlingResult failureHandlingResult, Map 
failureLabels) {
-
-// Add base attributes
-SpanBuilder spanBuilder =
-Span.builder(ExecutionFailureHandler.class, "JobFailure")
-.setStartTsMillis(failureHandlingResult.getTimestamp())
-.setEndTsMillis(failureHandlingResult.getTimestamp())
-.setAttribute(
-"canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
-.setAttribute(
-"isGlobalFailure",
-
String.valueOf(failureHandlingResult.isGlobalFailure()));
-
-// Add all failure labels

(flink) branch master updated: [FLINK-34546] Emit span with failure labels on failure.

2024-03-01 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7c8e3f5a0c3 [FLINK-34546] Emit span with failure labels on failure.
7c8e3f5a0c3 is described below

commit 7c8e3f5a0c39f9a82c5549925035344c5d27cb98
Author: Stefan Richter 
AuthorDate: Thu Feb 29 10:26:02 2024 +0100

[FLINK-34546] Emit span with failure labels on failure.
---
 .../apache/flink/configuration/TraceOptions.java   | 14 +
 .../failover/ExecutionFailureHandler.java  | 65 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  |  4 +-
 .../failover/ExecutionFailureHandlerTest.java  | 39 -
 4 files changed, 117 insertions(+), 5 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
index 1aee746e210..a7e84192dea 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
@@ -56,6 +56,20 @@ public class TraceOptions {
 + " any of the names in the list will be 
started. Otherwise, all reporters that could be found in"
 + " the configuration will be started.");
 
+/**
+ * Temporary option to report events as span. This option will be removed 
once we support
+ * reporting events.
+ */
+@Deprecated
+public static final ConfigOption REPORT_EVENTS_AS_SPANS =
+key("traces.report-events-as-spans")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Whether to report events as spans. This is a 
temporary parameter that "
++ "is in place until we have support for 
reporting events. "
++ "In the meantime, this can be activated 
to report them as spans instead.");
+
 /**
  * Returns a view over the given configuration via which options can be 
set/retrieved for the
  * given reporter.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
index aed330de522..3d36a9e6bff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
@@ -17,8 +17,11 @@
 
 package org.apache.flink.runtime.executiongraph.failover;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.failure.FailureEnricher.Context;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -28,6 +31,8 @@ import 
org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.throwable.ThrowableClassifier;
 import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
 import org.apache.flink.util.IterableUtils;
 
 import javax.annotation.Nullable;
@@ -47,6 +52,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ExecutionFailureHandler {
 
+public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX = 
"failureLabel.";
+
 private final SchedulingTopology schedulingTopology;
 
 /** Strategy to judge which tasks should be restarted. */
@@ -62,6 +69,9 @@ public class ExecutionFailureHandler {
 private final Context globalFailureCtx;
 private final Collection failureEnrichers;
 private final ComponentMainThreadExecutor mainThreadExecutor;
+private final MetricGroup metricGroup;
+
+private final boolean reportEventsAsSpans;
 
 /**
  * Creates the handler to deal with task failures.
@@ -76,13 +86,15 @@ public class ExecutionFailureHandler {
  * @param globalFailureCtx Global failure Context used by FailureEnrichers
  */
 public ExecutionFailureHandler(
+final Configuration jobMasterConfig,
 final SchedulingTopology schedulingTopology,
 final FailoverStrategy failoverStrategy,
 final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
 final ComponentMainThreadExecutor