This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7c8e3f5a0c3 [FLINK-34546] Emit span with failure labels on failure.
7c8e3f5a0c3 is described below
commit 7c8e3f5a0c39f9a82c5549925035344c5d27cb98
Author: Stefan Richter
AuthorDate: Thu Feb 29 10:26:02 2024 +0100
[FLINK-34546] Emit span with failure labels on failure.
---
.../apache/flink/configuration/TraceOptions.java | 14 +
.../failover/ExecutionFailureHandler.java | 65 +-
.../flink/runtime/scheduler/DefaultScheduler.java | 4 +-
.../failover/ExecutionFailureHandlerTest.java | 39 -
4 files changed, 117 insertions(+), 5 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
index 1aee746e210..a7e84192dea 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
@@ -56,6 +56,20 @@ public class TraceOptions {
+ " any of the names in the list will be
started. Otherwise, all reporters that could be found in"
+ " the configuration will be started.");
+/**
+ * Temporary option to report events as span. This option will be removed
once we support
+ * reporting events.
+ */
+@Deprecated
+public static final ConfigOption REPORT_EVENTS_AS_SPANS =
+key("traces.report-events-as-spans")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"Whether to report events as spans. This is a
temporary parameter that "
++ "is in place until we have support for
reporting events. "
++ "In the meantime, this can be activated
to report them as spans instead.");
+
/**
* Returns a view over the given configuration via which options can be
set/retrieved for the
* given reporter.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
index aed330de522..3d36a9e6bff 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
@@ -17,8 +17,11 @@
package org.apache.flink.runtime.executiongraph.failover;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.FailureEnricher.Context;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -28,6 +31,8 @@ import
org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.throwable.ThrowableClassifier;
import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.IterableUtils;
import javax.annotation.Nullable;
@@ -47,6 +52,8 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*/
public class ExecutionFailureHandler {
+public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX =
"failureLabel.";
+
private final SchedulingTopology schedulingTopology;
/** Strategy to judge which tasks should be restarted. */
@@ -62,6 +69,9 @@ public class ExecutionFailureHandler {
private final Context globalFailureCtx;
private final Collection failureEnrichers;
private final ComponentMainThreadExecutor mainThreadExecutor;
+private final MetricGroup metricGroup;
+
+private final boolean reportEventsAsSpans;
/**
* Creates the handler to deal with task failures.
@@ -76,13 +86,15 @@ public class ExecutionFailureHandler {
* @param globalFailureCtx Global failure Context used by FailureEnrichers
*/
public ExecutionFailureHandler(
+final Configuration jobMasterConfig,
final SchedulingTopology schedulingTopology,
final FailoverStrategy failoverStrategy,
final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
final ComponentMainThreadExecutor