This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c8e3f5a0c3 [FLINK-34546] Emit span with failure labels on failure.
7c8e3f5a0c3 is described below

commit 7c8e3f5a0c39f9a82c5549925035344c5d27cb98
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Thu Feb 29 10:26:02 2024 +0100

    [FLINK-34546] Emit span with failure labels on failure.
---
 .../apache/flink/configuration/TraceOptions.java   | 14 +++++
 .../failover/ExecutionFailureHandler.java          | 65 +++++++++++++++++++++-
 .../flink/runtime/scheduler/DefaultScheduler.java  |  4 +-
 .../failover/ExecutionFailureHandlerTest.java      | 39 ++++++++++++-
 4 files changed, 117 insertions(+), 5 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
index 1aee746e210..a7e84192dea 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java
@@ -56,6 +56,20 @@ public class TraceOptions {
                                     + " any of the names in the list will be 
started. Otherwise, all reporters that could be found in"
                                     + " the configuration will be started.");
 
+    /**
+     * Temporary option to report events as span. This option will be removed 
once we support
+     * reporting events.
+     */
+    @Deprecated
+    public static final ConfigOption<Boolean> REPORT_EVENTS_AS_SPANS =
+            key("traces.report-events-as-spans")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to report events as spans. This is a 
temporary parameter that "
+                                    + "is in place until we have support for 
reporting events. "
+                                    + "In the meantime, this can be activated 
to report them as spans instead.");
+
     /**
      * Returns a view over the given configuration via which options can be 
set/retrieved for the
      * given reporter.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
index aed330de522..3d36a9e6bff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
@@ -17,8 +17,11 @@
 
 package org.apache.flink.runtime.executiongraph.failover;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.failure.FailureEnricher.Context;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -28,6 +31,8 @@ import 
org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.throwable.ThrowableClassifier;
 import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
 import org.apache.flink.util.IterableUtils;
 
 import javax.annotation.Nullable;
@@ -47,6 +52,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ExecutionFailureHandler {
 
+    public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX = 
"failureLabel.";
+
     private final SchedulingTopology schedulingTopology;
 
     /** Strategy to judge which tasks should be restarted. */
@@ -62,6 +69,9 @@ public class ExecutionFailureHandler {
     private final Context globalFailureCtx;
     private final Collection<FailureEnricher> failureEnrichers;
     private final ComponentMainThreadExecutor mainThreadExecutor;
+    private final MetricGroup metricGroup;
+
+    private final boolean reportEventsAsSpans;
 
     /**
      * Creates the handler to deal with task failures.
@@ -76,13 +86,15 @@ public class ExecutionFailureHandler {
      * @param globalFailureCtx Global failure Context used by FailureEnrichers
      */
     public ExecutionFailureHandler(
+            final Configuration jobMasterConfig,
             final SchedulingTopology schedulingTopology,
             final FailoverStrategy failoverStrategy,
             final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
             final ComponentMainThreadExecutor mainThreadExecutor,
             final Collection<FailureEnricher> failureEnrichers,
             final Context taskFailureCtx,
-            final Context globalFailureCtx) {
+            final Context globalFailureCtx,
+            final MetricGroup metricGroup) {
 
         this.schedulingTopology = checkNotNull(schedulingTopology);
         this.failoverStrategy = checkNotNull(failoverStrategy);
@@ -91,6 +103,8 @@ public class ExecutionFailureHandler {
         this.failureEnrichers = checkNotNull(failureEnrichers);
         this.taskFailureCtx = taskFailureCtx;
         this.globalFailureCtx = globalFailureCtx;
+        this.metricGroup = metricGroup;
+        this.reportEventsAsSpans = 
jobMasterConfig.get(TraceOptions.REPORT_EVENTS_AS_SPANS);
     }
 
     /**
@@ -104,7 +118,7 @@ public class ExecutionFailureHandler {
      */
     public FailureHandlingResult getFailureHandlingResult(
             Execution failedExecution, Throwable cause, long timestamp) {
-        return handleFailure(
+        return handleFailureAndReport(
                 failedExecution,
                 cause,
                 timestamp,
@@ -123,7 +137,7 @@ public class ExecutionFailureHandler {
      */
     public FailureHandlingResult getGlobalFailureHandlingResult(
             final Throwable cause, long timestamp) {
-        return handleFailure(
+        return handleFailureAndReport(
                 null,
                 cause,
                 timestamp,
@@ -141,6 +155,51 @@ public class ExecutionFailureHandler {
         return FailureEnricherUtils.labelFailure(cause, ctx, 
mainThreadExecutor, failureEnrichers);
     }
 
+    private FailureHandlingResult handleFailureAndReport(
+            @Nullable final Execution failedExecution,
+            final Throwable cause,
+            long timestamp,
+            final Set<ExecutionVertexID> verticesToRestart,
+            final boolean globalFailure) {
+
+        FailureHandlingResult failureHandlingResult =
+                handleFailure(failedExecution, cause, timestamp, 
verticesToRestart, globalFailure);
+
+        if (reportEventsAsSpans) {
+            // TODO: replace with reporting as event once events are supported.
+            // Add reporting as callback for when the failure labeling is 
completed.
+            failureHandlingResult
+                    .getFailureLabels()
+                    .thenAcceptAsync(
+                            labels -> 
reportFailureHandling(failureHandlingResult, labels),
+                            mainThreadExecutor);
+        }
+
+        return failureHandlingResult;
+    }
+
+    private void reportFailureHandling(
+            FailureHandlingResult failureHandlingResult, Map<String, String> 
failureLabels) {
+
+        // Add base attributes
+        SpanBuilder spanBuilder =
+                Span.builder(ExecutionFailureHandler.class, "JobFailure")
+                        .setStartTsMillis(failureHandlingResult.getTimestamp())
+                        .setEndTsMillis(failureHandlingResult.getTimestamp())
+                        .setAttribute(
+                                "canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
+                        .setAttribute(
+                                "isGlobalFailure",
+                                
String.valueOf(failureHandlingResult.isGlobalFailure()));
+
+        // Add all failure labels
+        for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
+            spanBuilder.setAttribute(
+                    FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), 
entry.getValue());
+        }
+        metricGroup.addSpan(spanBuilder);
+    }
+
     private FailureHandlingResult handleFailure(
             @Nullable final Execution failedExecution,
             final Throwable cause,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index e3b046301b4..2fa43818ce2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -181,13 +181,15 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
         this.executionFailureHandler =
                 new ExecutionFailureHandler(
+                        jobMasterConfiguration,
                         getSchedulingTopology(),
                         failoverStrategy,
                         restartBackoffTimeStrategy,
                         mainThreadExecutor,
                         failureEnrichers,
                         taskFailureCtx,
-                        globalFailureCtx);
+                        globalFailureCtx,
+                        jobManagerJobMetricGroup);
 
         this.schedulingStrategy =
                 schedulingStrategyFactory.createInstance(this, 
getSchedulingTopology());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
index a9d3beada8a..3726742ae46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.executiongraph.failover;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.core.failure.TestingFailureEnricher;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -29,6 +32,8 @@ import 
org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
 import org.apache.flink.util.IterableUtils;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -36,7 +41,10 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,6 +74,8 @@ class ExecutionFailureHandlerTest {
 
     private TestingFailureEnricher testingFailureEnricher;
 
+    private List<Span> spanCollector;
+
     @BeforeEach
     void setUp() {
         TestingSchedulingTopology topology = new TestingSchedulingTopology();
@@ -77,15 +87,25 @@ class ExecutionFailureHandlerTest {
         isNewAttempt = new AtomicBoolean(true);
         backoffTimeStrategy =
                 new TestRestartBackoffTimeStrategy(true, RESTART_DELAY_MS, 
isNewAttempt::get);
+        spanCollector = new CopyOnWriteArrayList<>();
+        Configuration configuration = new Configuration();
+        configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
         executionFailureHandler =
                 new ExecutionFailureHandler(
+                        configuration,
                         schedulingTopology,
                         failoverStrategy,
                         backoffTimeStrategy,
                         
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                         Collections.singleton(testingFailureEnricher),
                         null,
-                        null);
+                        null,
+                        new UnregisteredMetricsGroup() {
+                            @Override
+                            public void addSpan(SpanBuilder spanBuilder) {
+                                spanCollector.add(spanBuilder.build());
+                            }
+                        });
     }
 
     /** Tests the case that task restarting is accepted. */
@@ -115,6 +135,7 @@ class ExecutionFailureHandlerTest {
         assertThat(result.getFailureLabels().get())
                 .isEqualTo(testingFailureEnricher.getFailureLabels());
         assertThat(executionFailureHandler.getNumberOfRestarts()).isOne();
+        checkMetrics(spanCollector, false, true);
     }
 
     /** Tests the case that task restarting is suppressed. */
@@ -151,6 +172,7 @@ class ExecutionFailureHandlerTest {
                 .isInstanceOf(IllegalStateException.class);
 
         assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
+        checkMetrics(spanCollector, false, false);
     }
 
     /** Tests the case that the failure is non-recoverable type. */
@@ -192,6 +214,7 @@ class ExecutionFailureHandlerTest {
                 .isInstanceOf(IllegalStateException.class);
 
         assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
+        checkMetrics(spanCollector, false, false);
     }
 
     @Test
@@ -217,6 +240,7 @@ class ExecutionFailureHandlerTest {
         isNewAttempt.set(false);
         testHandlingConcurrentException(execution, error);
         testHandlingConcurrentException(execution, error);
+        checkMetrics(spanCollector, false, true);
     }
 
     private void testHandlingRootException(Execution execution, Throwable 
error) {
@@ -283,6 +307,7 @@ class ExecutionFailureHandlerTest {
         
assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error);
         assertThat(result.getFailureLabels().get())
                 .isEqualTo(testingFailureEnricher.getFailureLabels());
+        checkMetrics(spanCollector, true, true);
     }
 
     // ------------------------------------------------------------------------
@@ -310,4 +335,16 @@ class ExecutionFailureHandlerTest {
             return tasksToRestart;
         }
     }
+
+    private void checkMetrics(List<Span> results, boolean global, boolean 
canRestart) {
+        assertThat(results).isNotEmpty();
+        for (Span span : results) {
+            
assertThat(span.getScope()).isEqualTo(ExecutionFailureHandler.class.getCanonicalName());
+            assertThat(span.getName()).isEqualTo("JobFailure");
+            Map<String, Object> attributes = span.getAttributes();
+            assertThat(attributes).containsEntry("failureLabel.failKey", 
"failValue");
+            assertThat(attributes).containsEntry("canRestart", 
String.valueOf(canRestart));
+            assertThat(attributes).containsEntry("isGlobalFailure", 
String.valueOf(global));
+        }
+    }
 }

Reply via email to