This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 10bff3dbad1 [FLINK-34546] Emit span with failure labels on failure in 
AdaptiveScheduler. (#24498)
10bff3dbad1 is described below

commit 10bff3dbad103b60915be817a3408820ed09b6cf
Author: Stefan Richter <srich...@apache.org>
AuthorDate: Fri Mar 15 09:36:43 2024 +0100

    [FLINK-34546] Emit span with failure labels on failure in 
AdaptiveScheduler. (#24498)
---
 .../failover/ExecutionFailureHandler.java          | 32 ++-------
 .../scheduler/adaptive/AdaptiveScheduler.java      | 22 +++++-
 .../runtime/scheduler/adaptive/Canceling.java      |  4 +-
 .../runtime/scheduler/adaptive/Executing.java      | 10 ++-
 .../flink/runtime/scheduler/adaptive/Failing.java  |  4 +-
 .../adaptive/JobFailureMetricReporter.java         | 84 ++++++++++++++++++++++
 .../runtime/scheduler/adaptive/Restarting.java     |  4 +-
 .../adaptive/StateWithExecutionGraph.java          |  7 +-
 .../scheduler/adaptive/StopWithSavepoint.java      |  9 ++-
 .../failover/ExecutionFailureHandlerTest.java      |  4 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 78 +++++++++++++++++++-
 .../runtime/scheduler/adaptive/ExecutingTest.java  |  3 +-
 .../adaptive/StateWithExecutionGraphTest.java      |  2 +-
 .../scheduler/adaptive/StopWithSavepointTest.java  | 13 +++-
 14 files changed, 228 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
index 3d36a9e6bff..94130bc2f5f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java
@@ -26,13 +26,12 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.failure.FailureEnricherUtils;
+import org.apache.flink.runtime.scheduler.adaptive.JobFailureMetricReporter;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.throwable.ThrowableClassifier;
 import org.apache.flink.runtime.throwable.ThrowableType;
-import org.apache.flink.traces.Span;
-import org.apache.flink.traces.SpanBuilder;
 import org.apache.flink.util.IterableUtils;
 
 import javax.annotation.Nullable;
@@ -70,8 +69,8 @@ public class ExecutionFailureHandler {
     private final Collection<FailureEnricher> failureEnrichers;
     private final ComponentMainThreadExecutor mainThreadExecutor;
     private final MetricGroup metricGroup;
-
     private final boolean reportEventsAsSpans;
+    private final JobFailureMetricReporter jobFailureMetricReporter;
 
     /**
      * Creates the handler to deal with task failures.
@@ -105,6 +104,7 @@ public class ExecutionFailureHandler {
         this.globalFailureCtx = globalFailureCtx;
         this.metricGroup = metricGroup;
         this.reportEventsAsSpans = 
jobMasterConfig.get(TraceOptions.REPORT_EVENTS_AS_SPANS);
+        this.jobFailureMetricReporter = new 
JobFailureMetricReporter(metricGroup);
     }
 
     /**
@@ -171,35 +171,15 @@ public class ExecutionFailureHandler {
             failureHandlingResult
                     .getFailureLabels()
                     .thenAcceptAsync(
-                            labels -> 
reportFailureHandling(failureHandlingResult, labels),
+                            labels ->
+                                    jobFailureMetricReporter.reportJobFailure(
+                                            failureHandlingResult, labels),
                             mainThreadExecutor);
         }
 
         return failureHandlingResult;
     }
 
-    private void reportFailureHandling(
-            FailureHandlingResult failureHandlingResult, Map<String, String> 
failureLabels) {
-
-        // Add base attributes
-        SpanBuilder spanBuilder =
-                Span.builder(ExecutionFailureHandler.class, "JobFailure")
-                        .setStartTsMillis(failureHandlingResult.getTimestamp())
-                        .setEndTsMillis(failureHandlingResult.getTimestamp())
-                        .setAttribute(
-                                "canRestart", 
String.valueOf(failureHandlingResult.canRestart()))
-                        .setAttribute(
-                                "isGlobalFailure",
-                                
String.valueOf(failureHandlingResult.isGlobalFailure()));
-
-        // Add all failure labels
-        for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
-            spanBuilder.setAttribute(
-                    FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), 
entry.getValue());
-        }
-        metricGroup.addSpan(spanBuilder);
-    }
-
     private FailureHandlingResult handleFailure(
             @Nullable final Execution failedExecution,
             final Throwable cause,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 9fb9e07de0f..56ff9bed45b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
@@ -332,6 +333,9 @@ public class AdaptiveScheduler
 
     private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
+    private final JobFailureMetricReporter jobFailureMetricReporter;
+    private final boolean reportEventsAsSpans;
+
     public AdaptiveScheduler(
             Settings settings,
             JobGraph jobGraph,
@@ -422,6 +426,9 @@ public class AdaptiveScheduler
         this.exceptionHistory =
                 new 
BoundedFIFOQueue<>(configuration.get(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
         this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+
+        this.jobFailureMetricReporter = new 
JobFailureMetricReporter(jobManagerJobMetricGroup);
+        this.reportEventsAsSpans = 
configuration.get(TraceOptions.REPORT_EVENTS_AS_SPANS);
     }
 
     private static void assertPreconditions(JobGraph jobGraph) throws 
RuntimeException {
@@ -1316,7 +1323,20 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public FailureResult howToHandleFailure(Throwable failure) {
+    public FailureResult howToHandleFailure(
+            Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels) {
+        FailureResult failureResult = howToHandleFailure(failure);
+        if (reportEventsAsSpans) {
+            // TODO: replace with reporting as event once events are supported.
+            // Add reporting as callback for when the failure labeling is 
completed.
+            failureLabels.thenAcceptAsync(
+                    (labels) -> 
jobFailureMetricReporter.reportJobFailure(failureResult, labels),
+                    componentMainThreadExecutor);
+        }
+        return failureResult;
+    }
+
+    private FailureResult howToHandleFailure(Throwable failure) {
         if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
             return FailureResult.canNotRestart(
                     new JobException("The failure is not recoverable", 
failure));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
index de8824b5335..95adb0cf5d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry
 import org.slf4j.Logger;
 
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /** State which describes a job which is currently being canceled. */
 class Canceling extends StateWithExecutionGraph {
@@ -66,7 +68,7 @@ class Canceling extends StateWithExecutionGraph {
     }
 
     @Override
-    void onFailure(Throwable failure) {
+    void onFailure(Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels) {
         // Execution graph is already cancelling, so there is nothing more we 
can do.
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index 42f38695362..96d835cdaa2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
@@ -106,8 +107,9 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
     }
 
     @Override
-    void onFailure(Throwable cause) {
-        FailureResultUtil.restartOrFail(context.howToHandleFailure(cause), 
context, this);
+    void onFailure(Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {
+        FailureResultUtil.restartOrFail(
+                context.howToHandleFailure(cause, failureLabels), context, 
this);
     }
 
     @Override
@@ -255,9 +257,11 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
          * Asks how to handle the failure.
          *
          * @param failure failure describing the failure cause
+         * @param failureLabels future of labels from error classification.
          * @return {@link FailureResult} which describes how to handle the 
failure
          */
-        FailureResult howToHandleFailure(Throwable failure);
+        FailureResult howToHandleFailure(
+                Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels);
 
         /**
          * Asks if we should rescale the currently executing job.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
index ccd6bae9367..4744fb8feb7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /** State which describes a failing job which is currently being canceled. */
 class Failing extends StateWithExecutionGraph {
@@ -71,7 +73,7 @@ class Failing extends StateWithExecutionGraph {
     }
 
     @Override
-    void onFailure(Throwable failure) {
+    void onFailure(Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels) {
         // We've already failed the execution graph, so there is noting else 
we can do.
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java
new file mode 100644
index 00000000000..fdbd497ecab
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobFailureMetricReporter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+/** Helper class to simplify job failure reporting through a metric group. */
+public class JobFailureMetricReporter {
+
+    public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX = 
"failureLabel.";
+
+    private final MetricGroup metricGroup;
+
+    public JobFailureMetricReporter(MetricGroup metricGroup) {
+        this.metricGroup = Preconditions.checkNotNull(metricGroup);
+    }
+
+    public void reportJobFailure(
+            FailureHandlingResult failureHandlingResult, Map<String, String> 
failureLabels) {
+        reportJobFailure(
+                failureHandlingResult.getTimestamp(),
+                failureHandlingResult.canRestart(),
+                failureHandlingResult.isGlobalFailure(),
+                failureLabels);
+    }
+
+    public void reportJobFailure(
+            FailureResult failureHandlingResult, Map<String, String> 
failureLabels) {
+        reportJobFailure(
+                System.currentTimeMillis(),
+                failureHandlingResult.canRestart(),
+                null,
+                failureLabels);
+    }
+
+    private void reportJobFailure(
+            long timestamp,
+            Boolean canRestart,
+            Boolean isGlobal,
+            Map<String, String> failureLabels) {
+        // Add base attributes
+        SpanBuilder spanBuilder =
+                Span.builder(JobFailureMetricReporter.class, "JobFailure")
+                        .setStartTsMillis(timestamp)
+                        .setEndTsMillis(timestamp);
+
+        if (canRestart != null) {
+            spanBuilder.setAttribute("canRestart", String.valueOf(canRestart));
+        }
+
+        if (isGlobal != null) {
+            spanBuilder.setAttribute("isGlobalFailure", 
String.valueOf(isGlobal));
+        }
+
+        // Add all failure labels
+        for (Map.Entry<String, String> entry : failureLabels.entrySet()) {
+            spanBuilder.setAttribute(
+                    FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), 
entry.getValue());
+        }
+        metricGroup.addSpan(spanBuilder);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index 86e28fe8741..f647967edb4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -31,6 +31,8 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 
 /** State which describes a job which is currently being restarted. */
@@ -94,7 +96,7 @@ class Restarting extends StateWithExecutionGraph {
     }
 
     @Override
-    void onFailure(Throwable failure) {
+    void onFailure(Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels) {
         // We've already cancelled the execution graph, so there is noting 
else we can do.
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index 4a831669a36..6208de5000a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -377,7 +377,7 @@ abstract class StateWithExecutionGraph implements State {
     }
 
     /** Transition to different state when failure occurs. Stays in the same 
state by default. */
-    abstract void onFailure(Throwable cause);
+    abstract void onFailure(Throwable cause, CompletableFuture<Map<String, 
String>> failureLabels);
 
     /**
      * Transition to different state when the execution graph reaches a 
globally terminal state.
@@ -390,7 +390,7 @@ abstract class StateWithExecutionGraph implements State {
     public void handleGlobalFailure(
             Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {
         failureCollection.add(ExceptionHistoryEntry.createGlobal(cause, 
failureLabels));
-        onFailure(cause);
+        onFailure(cause, failureLabels);
     }
 
     /**
@@ -422,7 +422,8 @@ abstract class StateWithExecutionGraph implements State {
                         ExceptionHistoryEntry.create(execution, taskName, 
failureLabels));
                 onFailure(
                         ErrorInfo.handleMissingThrowable(
-                                
taskExecutionStateTransition.getError(userCodeClassLoader)));
+                                
taskExecutionStateTransition.getError(userCodeClassLoader)),
+                        failureLabels);
             }
         }
         return successfulUpdate;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
index b81b485960a..35d3f76787f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 
@@ -209,7 +210,7 @@ class StopWithSavepoint extends StateWithExecutionGraph {
     }
 
     @Override
-    void onFailure(Throwable cause) {
+    void onFailure(Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {
         if (hasPendingStateTransition) {
             // the error handling remains the same independent of how many 
tasks have failed
             // we don't want to initiate the same state transition multiple 
times, so we exit early
@@ -231,7 +232,7 @@ class StopWithSavepoint extends StateWithExecutionGraph {
                                                     savepoint, getJobId(), 
cause);
                             operationFailureCause = ex;
                             FailureResultUtil.restartOrFail(
-                                    context.howToHandleFailure(ex), context, 
this);
+                                    context.howToHandleFailure(ex, 
failureLabels), context, this);
                             return null;
                         }));
     }
@@ -279,9 +280,11 @@ class StopWithSavepoint extends StateWithExecutionGraph {
          * Asks how to handle the failure.
          *
          * @param failure failure describing the failure cause
+         * @param failureLabels future of labels from error classification.
          * @return {@link FailureResult} which describes how to handle the 
failure
          */
-        FailureResult howToHandleFailure(Throwable failure);
+        FailureResult howToHandleFailure(
+                Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels);
 
         /**
          * Runs the given action after the specified delay if the state is the 
expected state at
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
index 3726742ae46..7602944b5f7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.adaptive.JobFailureMetricReporter;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
@@ -339,7 +340,8 @@ class ExecutionFailureHandlerTest {
     private void checkMetrics(List<Span> results, boolean global, boolean 
canRestart) {
         assertThat(results).isNotEmpty();
         for (Span span : results) {
-            
assertThat(span.getScope()).isEqualTo(ExecutionFailureHandler.class.getCanonicalName());
+            assertThat(span.getScope())
+                    
.isEqualTo(JobFailureMetricReporter.class.getCanonicalName());
             assertThat(span.getName()).isEqualTo("JobFailure");
             Map<String, Object> attributes = span.getAttributes();
             assertThat(attributes).containsEntry("failureLabel.failKey", 
"failValue");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index e2260d1c19c..273495b3975 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.configuration.TraceOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.failure.TestingFailureEnricher;
@@ -83,6 +84,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
@@ -106,6 +108,8 @@ import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.traces.Span;
+import org.apache.flink.traces.SpanBuilder;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.Preconditions;
@@ -1372,19 +1376,41 @@ public class AdaptiveSchedulerTest {
 
     @Test
     void testHowToHandleFailureRejectedByStrategy() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
+        final List<Span> spanCollector = new ArrayList<>(1);
+        final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup 
testMetricGroup =
+                createTestMetricGroup(spanCollector);
+
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
                                 mainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         
.setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(testMetricGroup)
                         .build();
 
-        assertThat(scheduler.howToHandleFailure(new 
Exception("test")).canRestart()).isFalse();
+        assertThat(
+                        scheduler
+                                .howToHandleFailure(
+                                        new Exception("test"), 
createFailureLabelsFuture())
+                                .canRestart())
+                .isFalse();
+
+        assertThat(spanCollector).isEmpty();
+        mainThreadExecutor.trigger();
+        checkMetrics(spanCollector, false);
     }
 
     @Test
     void testHowToHandleFailureAllowedByStrategy() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
+        final List<Span> spanCollector = new ArrayList<>(1);
+        final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup 
testMetricGroup =
+                createTestMetricGroup(spanCollector);
         final TestRestartBackoffTimeStrategy restartBackoffTimeStrategy =
                 new TestRestartBackoffTimeStrategy(true, 1234);
 
@@ -1394,30 +1420,50 @@ public class AdaptiveSchedulerTest {
                                 mainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
                         
.setRestartBackoffTimeStrategy(restartBackoffTimeStrategy)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(testMetricGroup)
                         .build();
 
-        final FailureResult failureResult = scheduler.howToHandleFailure(new 
Exception("test"));
+        final FailureResult failureResult =
+                scheduler.howToHandleFailure(new Exception("test"), 
createFailureLabelsFuture());
 
         assertThat(failureResult.canRestart()).isTrue();
         assertThat(failureResult.getBackoffTime().toMillis())
                 .isEqualTo(restartBackoffTimeStrategy.getBackoffTime());
+
+        assertThat(spanCollector).isEmpty();
+        mainThreadExecutor.trigger();
+        checkMetrics(spanCollector, true);
     }
 
     @Test
     void testHowToHandleFailureUnrecoverableFailure() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE);
+        final List<Span> spanCollector = new ArrayList<>(1);
+        final UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup 
testMetricGroup =
+                createTestMetricGroup(spanCollector);
+
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(
                                 createJobGraph(),
                                 mainThreadExecutor,
                                 EXECUTOR_RESOURCE.getExecutor())
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(testMetricGroup)
                         .build();
 
         assertThat(
                         scheduler
                                 .howToHandleFailure(
-                                        new SuppressRestartsException(new 
Exception("test")))
+                                        new SuppressRestartsException(new 
Exception("test")),
+                                        createFailureLabelsFuture())
                                 .canRestart())
                 .isFalse();
+
+        assertThat(spanCollector).isEmpty();
+        mainThreadExecutor.trigger();
+        checkMetrics(spanCollector, false);
     }
 
     @Test
@@ -2495,4 +2541,30 @@ public class AdaptiveSchedulerTest {
             return scheduler.requestJob().getExceptionHistory();
         }
     }
+
+    private static CompletableFuture<Map<String, String>> 
createFailureLabelsFuture() {
+        return 
CompletableFuture.completedFuture(Collections.singletonMap("failKey", 
"failValue"));
+    }
+
+    private static 
UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup
+            createTestMetricGroup(List<Span> output) {
+        return new 
UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() {
+            @Override
+            public void addSpan(SpanBuilder spanBuilder) {
+                output.add(spanBuilder.build());
+            }
+        };
+    }
+
+    private static void checkMetrics(List<Span> results, boolean canRestart) {
+        assertThat(results).isNotEmpty();
+        for (Span span : results) {
+            assertThat(span.getScope())
+                    
.isEqualTo(JobFailureMetricReporter.class.getCanonicalName());
+            assertThat(span.getName()).isEqualTo("JobFailure");
+            Map<String, Object> attributes = span.getAttributes();
+            assertThat(attributes).containsEntry("failureLabel.failKey", 
"failValue");
+            assertThat(attributes).containsEntry("canRestart", 
String.valueOf(canRestart));
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index b892569ef1a..a9ee04f12ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -703,7 +703,8 @@ public class ExecutingTest extends TestLogger {
         }
 
         @Override
-        public FailureResult howToHandleFailure(Throwable failure) {
+        public FailureResult howToHandleFailure(
+                Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels) {
             return howToHandleFailure.apply(failure);
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java
index cd1fd57036e..0a3d598ca36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java
@@ -218,7 +218,7 @@ public class StateWithExecutionGraphTest extends TestLogger 
{
         }
 
         @Override
-        void onFailure(Throwable cause) {}
+        void onFailure(Throwable cause, CompletableFuture<Map<String, String>> 
failureLabels) {}
 
         @Override
         void onGloballyTerminalState(JobStatus globallyTerminalState) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index f20cdd7e656..3bec0538fd5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -41,7 +41,9 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -381,7 +383,9 @@ class StopWithSavepointTest {
 
             ctx.setHowToHandleFailure(failure -> 
FailureResult.canRestart(failure, Duration.ZERO));
 
-            sws.onFailure(new Exception("task failure"));
+            sws.onFailure(
+                    new Exception("task failure"),
+                    CompletableFuture.completedFuture(Collections.emptyMap()));
             // this is a sanity check that we haven't scheduled a state 
transition
             ctx.triggerExecutors();
 
@@ -404,7 +408,9 @@ class StopWithSavepointTest {
 
             ctx.setHowToHandleFailure(failure -> 
FailureResult.canRestart(failure, Duration.ZERO));
 
-            sws.onFailure(new Exception("task failure"));
+            sws.onFailure(
+                    new Exception("task failure"),
+                    CompletableFuture.completedFuture(Collections.emptyMap()));
             // this is a sanity check that we haven't scheduled a state 
transition
             ctx.triggerExecutors();
 
@@ -538,7 +544,8 @@ class StopWithSavepointTest {
         }
 
         @Override
-        public FailureResult howToHandleFailure(Throwable failure) {
+        public FailureResult howToHandleFailure(
+                Throwable failure, CompletableFuture<Map<String, String>> 
failureLabels) {
             return howToHandleFailure.apply(failure);
         }
 

Reply via email to