This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7c8e3f5a0c3 [FLINK-34546] Emit span with failure labels on failure. 7c8e3f5a0c3 is described below commit 7c8e3f5a0c39f9a82c5549925035344c5d27cb98 Author: Stefan Richter <srich...@confluent.io> AuthorDate: Thu Feb 29 10:26:02 2024 +0100 [FLINK-34546] Emit span with failure labels on failure. --- .../apache/flink/configuration/TraceOptions.java | 14 +++++ .../failover/ExecutionFailureHandler.java | 65 +++++++++++++++++++++- .../flink/runtime/scheduler/DefaultScheduler.java | 4 +- .../failover/ExecutionFailureHandlerTest.java | 39 ++++++++++++- 4 files changed, 117 insertions(+), 5 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java index 1aee746e210..a7e84192dea 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TraceOptions.java @@ -56,6 +56,20 @@ public class TraceOptions { + " any of the names in the list will be started. Otherwise, all reporters that could be found in" + " the configuration will be started."); + /** + * Temporary option to report events as span. This option will be removed once we support + * reporting events. + */ + @Deprecated + public static final ConfigOption<Boolean> REPORT_EVENTS_AS_SPANS = + key("traces.report-events-as-spans") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to report events as spans. This is a temporary parameter that " + + "is in place until we have support for reporting events. " + + "In the meantime, this can be activated to report them as spans instead."); + /** * Returns a view over the given configuration via which options can be set/retrieved for the * given reporter. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java index aed330de522..3d36a9e6bff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.java @@ -17,8 +17,11 @@ package org.apache.flink.runtime.executiongraph.failover; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TraceOptions; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.core.failure.FailureEnricher.Context; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.Execution; @@ -28,6 +31,8 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.throwable.ThrowableClassifier; import org.apache.flink.runtime.throwable.ThrowableType; +import org.apache.flink.traces.Span; +import org.apache.flink.traces.SpanBuilder; import org.apache.flink.util.IterableUtils; import javax.annotation.Nullable; @@ -47,6 +52,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ExecutionFailureHandler { + public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX = "failureLabel."; + private final SchedulingTopology schedulingTopology; /** Strategy to judge which tasks should be restarted. */ @@ -62,6 +69,9 @@ public class ExecutionFailureHandler { private final Context globalFailureCtx; private final Collection<FailureEnricher> failureEnrichers; private final ComponentMainThreadExecutor mainThreadExecutor; + private final MetricGroup metricGroup; + + private final boolean reportEventsAsSpans; /** * Creates the handler to deal with task failures. @@ -76,13 +86,15 @@ public class ExecutionFailureHandler { * @param globalFailureCtx Global failure Context used by FailureEnrichers */ public ExecutionFailureHandler( + final Configuration jobMasterConfig, final SchedulingTopology schedulingTopology, final FailoverStrategy failoverStrategy, final RestartBackoffTimeStrategy restartBackoffTimeStrategy, final ComponentMainThreadExecutor mainThreadExecutor, final Collection<FailureEnricher> failureEnrichers, final Context taskFailureCtx, - final Context globalFailureCtx) { + final Context globalFailureCtx, + final MetricGroup metricGroup) { this.schedulingTopology = checkNotNull(schedulingTopology); this.failoverStrategy = checkNotNull(failoverStrategy); @@ -91,6 +103,8 @@ public class ExecutionFailureHandler { this.failureEnrichers = checkNotNull(failureEnrichers); this.taskFailureCtx = taskFailureCtx; this.globalFailureCtx = globalFailureCtx; + this.metricGroup = metricGroup; + this.reportEventsAsSpans = jobMasterConfig.get(TraceOptions.REPORT_EVENTS_AS_SPANS); } /** @@ -104,7 +118,7 @@ public class ExecutionFailureHandler { */ public FailureHandlingResult getFailureHandlingResult( Execution failedExecution, Throwable cause, long timestamp) { - return handleFailure( + return handleFailureAndReport( failedExecution, cause, timestamp, @@ -123,7 +137,7 @@ public class ExecutionFailureHandler { */ public FailureHandlingResult getGlobalFailureHandlingResult( final Throwable cause, long timestamp) { - return handleFailure( + return handleFailureAndReport( null, cause, timestamp, @@ -141,6 +155,51 @@ public class ExecutionFailureHandler { return FailureEnricherUtils.labelFailure(cause, ctx, mainThreadExecutor, failureEnrichers); } + private FailureHandlingResult handleFailureAndReport( + @Nullable final Execution failedExecution, + final Throwable cause, + long timestamp, + final Set<ExecutionVertexID> verticesToRestart, + final boolean globalFailure) { + + FailureHandlingResult failureHandlingResult = + handleFailure(failedExecution, cause, timestamp, verticesToRestart, globalFailure); + + if (reportEventsAsSpans) { + // TODO: replace with reporting as event once events are supported. + // Add reporting as callback for when the failure labeling is completed. + failureHandlingResult + .getFailureLabels() + .thenAcceptAsync( + labels -> reportFailureHandling(failureHandlingResult, labels), + mainThreadExecutor); + } + + return failureHandlingResult; + } + + private void reportFailureHandling( + FailureHandlingResult failureHandlingResult, Map<String, String> failureLabels) { + + // Add base attributes + SpanBuilder spanBuilder = + Span.builder(ExecutionFailureHandler.class, "JobFailure") + .setStartTsMillis(failureHandlingResult.getTimestamp()) + .setEndTsMillis(failureHandlingResult.getTimestamp()) + .setAttribute( + "canRestart", String.valueOf(failureHandlingResult.canRestart())) + .setAttribute( + "isGlobalFailure", + String.valueOf(failureHandlingResult.isGlobalFailure())); + + // Add all failure labels + for (Map.Entry<String, String> entry : failureLabels.entrySet()) { + spanBuilder.setAttribute( + FAILURE_LABEL_ATTRIBUTE_PREFIX + entry.getKey(), entry.getValue()); + } + metricGroup.addSpan(spanBuilder); + } + private FailureHandlingResult handleFailure( @Nullable final Execution failedExecution, final Throwable cause, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index e3b046301b4..2fa43818ce2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -181,13 +181,15 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio this.executionFailureHandler = new ExecutionFailureHandler( + jobMasterConfiguration, getSchedulingTopology(), failoverStrategy, restartBackoffTimeStrategy, mainThreadExecutor, failureEnrichers, taskFailureCtx, - globalFailureCtx); + globalFailureCtx, + jobManagerJobMetricGroup); this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java index a9d3beada8a..3726742ae46 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.java @@ -18,7 +18,10 @@ package org.apache.flink.runtime.executiongraph.failover; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TraceOptions; import org.apache.flink.core.failure.TestingFailureEnricher; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.Execution; @@ -29,6 +32,8 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.traces.Span; +import org.apache.flink.traces.SpanBuilder; import org.apache.flink.util.IterableUtils; import org.junit.jupiter.api.BeforeEach; @@ -36,7 +41,10 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,6 +74,8 @@ class ExecutionFailureHandlerTest { private TestingFailureEnricher testingFailureEnricher; + private List<Span> spanCollector; + @BeforeEach void setUp() { TestingSchedulingTopology topology = new TestingSchedulingTopology(); @@ -77,15 +87,25 @@ class ExecutionFailureHandlerTest { isNewAttempt = new AtomicBoolean(true); backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, RESTART_DELAY_MS, isNewAttempt::get); + spanCollector = new CopyOnWriteArrayList<>(); + Configuration configuration = new Configuration(); + configuration.set(TraceOptions.REPORT_EVENTS_AS_SPANS, Boolean.TRUE); executionFailureHandler = new ExecutionFailureHandler( + configuration, schedulingTopology, failoverStrategy, backoffTimeStrategy, ComponentMainThreadExecutorServiceAdapter.forMainThread(), Collections.singleton(testingFailureEnricher), null, - null); + null, + new UnregisteredMetricsGroup() { + @Override + public void addSpan(SpanBuilder spanBuilder) { + spanCollector.add(spanBuilder.build()); + } + }); } /** Tests the case that task restarting is accepted. */ @@ -115,6 +135,7 @@ class ExecutionFailureHandlerTest { assertThat(result.getFailureLabels().get()) .isEqualTo(testingFailureEnricher.getFailureLabels()); assertThat(executionFailureHandler.getNumberOfRestarts()).isOne(); + checkMetrics(spanCollector, false, true); } /** Tests the case that task restarting is suppressed. */ @@ -151,6 +172,7 @@ class ExecutionFailureHandlerTest { .isInstanceOf(IllegalStateException.class); assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); + checkMetrics(spanCollector, false, false); } /** Tests the case that the failure is non-recoverable type. */ @@ -192,6 +214,7 @@ class ExecutionFailureHandlerTest { .isInstanceOf(IllegalStateException.class); assertThat(executionFailureHandler.getNumberOfRestarts()).isZero(); + checkMetrics(spanCollector, false, false); } @Test @@ -217,6 +240,7 @@ class ExecutionFailureHandlerTest { isNewAttempt.set(false); testHandlingConcurrentException(execution, error); testHandlingConcurrentException(execution, error); + checkMetrics(spanCollector, false, true); } private void testHandlingRootException(Execution execution, Throwable error) { @@ -283,6 +307,7 @@ class ExecutionFailureHandlerTest { assertThat(testingFailureEnricher.getSeenThrowables()).containsExactly(error); assertThat(result.getFailureLabels().get()) .isEqualTo(testingFailureEnricher.getFailureLabels()); + checkMetrics(spanCollector, true, true); } // ------------------------------------------------------------------------ @@ -310,4 +335,16 @@ class ExecutionFailureHandlerTest { return tasksToRestart; } } + + private void checkMetrics(List<Span> results, boolean global, boolean canRestart) { + assertThat(results).isNotEmpty(); + for (Span span : results) { + assertThat(span.getScope()).isEqualTo(ExecutionFailureHandler.class.getCanonicalName()); + assertThat(span.getName()).isEqualTo("JobFailure"); + Map<String, Object> attributes = span.getAttributes(); + assertThat(attributes).containsEntry("failureLabel.failKey", "failValue"); + assertThat(attributes).containsEntry("canRestart", String.valueOf(canRestart)); + assertThat(attributes).containsEntry("isGlobalFailure", String.valueOf(global)); + } + } }