This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 61ca405d372 Implementing lull reporting at bundle level processing (#30693) 61ca405d372 is described below commit 61ca405d372b02c985f641e96d10e79f61400f4c Author: Arvind Ram <arvindra...@gmail.com> AuthorDate: Mon Apr 29 06:15:45 2024 -0700 Implementing lull reporting at bundle level processing (#30693) This reverts commit 50c59912bc002947c335170b42827b278b78aae1. * reset lull state cleanup during deactivate --- .../core/metrics/ExecutionStateTracker.java | 41 +++++++- .../dataflow/worker/DataflowExecutionContext.java | 96 +++++++++++++++++ .../dataflow/worker/DataflowOperationContext.java | 31 +----- .../runners/dataflow/worker/StackTraceUtil.java | 66 ++++++++++++ .../worker/DataflowExecutionStateTrackerTest.java | 115 ++++++++++++++++++++- .../util/common/worker/MapTaskExecutorTest.java | 28 +++-- 6 files changed, 336 insertions(+), 41 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java index dc6fd2f8248..f70e9ac16f9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java @@ -46,6 +46,7 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> new ConcurrentHashMap<>(); private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5); + private static final long BUNDLE_LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(10); private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> SAMPLING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, "sampling"); @@ -139,8 +140,17 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> */ private volatile long millisSinceLastTransition = 0; + /** + * The number of milliseconds since the {@link ExecutionStateTracker} initial state. + * + * <p>This variable is updated by the Sampling thread, and read by the Progress Reporting thread, + * thus it being marked volatile. + */ + private volatile long millisSinceBundleStart = 0; + private long transitionsAtLastSample = 0; private long nextLullReportMs = LULL_REPORT_MS; + private long nextBundleLullDurationReportMs = BUNDLE_LULL_REPORT_MS; public ExecutionStateTracker(ExecutionStateSampler sampler) { this.sampler = sampler; @@ -239,13 +249,16 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> return trackedThread; } - private synchronized void deactivate() { + @VisibleForTesting + public synchronized void deactivate() { sampler.removeTracker(this); Thread thread = this.trackedThread; if (thread != null) { CURRENT_TRACKERS.remove(thread.getId()); } this.trackedThread = null; + millisSinceBundleStart = 0; + nextBundleLullDurationReportMs = BUNDLE_LULL_REPORT_MS; } public ExecutionState getCurrentState() { @@ -294,6 +307,11 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> return millisSinceLastTransition; } + /** Return the time since the last transition. */ + public long getMillisSinceBundleStart() { + return millisSinceBundleStart; + } + /** Return the number of transitions since the last sample. */ public long getTransitionsAtLastSample() { return transitionsAtLastSample; @@ -304,6 +322,12 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> return nextLullReportMs; } + /** Return the duration since bundle start for the next bundle lull report. */ + @VisibleForTesting + public long getNextBundleLullDurationReportMs() { + return nextBundleLullDurationReportMs; + } + /** * Called periodically by the {@link ExecutionStateSampler} to report time recorded by the * tracker. @@ -335,6 +359,21 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> transitionsAtLastSample = transitionsAtThisSample; } updateMillisSinceLastTransition(millisSinceLastSample, state); + updateMillisSinceBundleStart(millisSinceLastSample); + } + + // Override this to implement bundle level lull reporting. + protected void reportBundleLull(Thread trackedThread, long millisSinceBundleStart) {} + + // This suppression doesn't cause any race condition because it is updated by only one thread + // which is currently tracked. + @SuppressWarnings("NonAtomicVolatileUpdate") + private void updateMillisSinceBundleStart(long millisSinceLastSample) { + millisSinceBundleStart += millisSinceLastSample; + if (millisSinceBundleStart > nextBundleLullDurationReportMs) { + reportBundleLull(trackedThread, millisSinceBundleStart); + nextBundleLullDurationReportMs += BUNDLE_LULL_REPORT_MS; + } } @SuppressWarnings("NonAtomicVolatileUpdate") diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 71c158b5da2..2b4c7df6ace 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import com.google.api.client.util.Clock; import com.google.api.services.dataflow.model.SideInputInfo; import java.io.Closeable; import java.io.IOException; @@ -29,6 +30,8 @@ import java.util.IntSummaryStatistics; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.LogRecord; import java.util.stream.Collectors; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.NullSideInputReader; @@ -43,6 +46,8 @@ import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.Dataflow import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.NameContext; +import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler; +import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.MetricsContainer; @@ -50,11 +55,17 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Execution context for the Dataflow worker. */ @SuppressWarnings({ @@ -264,21 +275,53 @@ public abstract class DataflowExecutionContext<T extends DataflowStepContext> { @Nullable private ActiveMessageMetadata activeMessageMetadata = null; + /** Clock used to either provide real system time or mocked to virtualize time for testing. */ + private final Clock clock; + @GuardedBy("this") private final Map<String, IntSummaryStatistics> processingTimesByStep = new HashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger(DataflowExecutionStateTracker.class); + + private static final PeriodFormatter DURATION_FORMATTER = + new PeriodFormatterBuilder() + .appendDays() + .appendSuffix("d") + .minimumPrintedDigits(2) + .appendHours() + .appendSuffix("h") + .printZeroAlways() + .appendMinutes() + .appendSuffix("m") + .appendSeconds() + .appendSuffix("s") + .toFormatter(); + public DataflowExecutionStateTracker( ExecutionStateSampler sampler, DataflowOperationContext.DataflowExecutionState otherState, CounterFactory counterFactory, PipelineOptions options, String workItemId) { + this(sampler, otherState, counterFactory, options, workItemId, Clock.SYSTEM); + } + + @VisibleForTesting + public DataflowExecutionStateTracker( + ExecutionStateSampler sampler, + DataflowOperationContext.DataflowExecutionState otherState, + CounterFactory counterFactory, + PipelineOptions options, + String workItemId, + Clock clock) { super(sampler); this.elementExecutionTracker = DataflowElementExecutionTracker.create(counterFactory, options); this.otherState = otherState; this.workItemId = workItemId; this.contextActivationObserverRegistry = ContextActivationObserverRegistry.createDefault(); + this.clock = clock; + DataflowWorkerLoggingInitializer.initialize(); if (options instanceof DataflowPipelineOptions) { this.isStreaming = ((DataflowPipelineOptions) options).isStreaming(); } else { @@ -308,12 +351,65 @@ public abstract class DataflowExecutionContext<T extends DataflowStepContext> { } } + private String getBundleLullMessage(Thread trackedThread, Duration lullDuration) { + StringBuilder message = new StringBuilder(); + message + .append("Operation ongoing in bundle for at least ") + .append(DURATION_FORMATTER.print(lullDuration.toPeriod())) + .append(" without completing") + .append("\n"); + synchronized (this) { + if (this.activeMessageMetadata != null) { + message.append( + "Current user step name: " + getActiveMessageMetadata().get().userStepName() + "\n"); + message.append( + "Time spent in this step(millis): " + + (clock.currentTimeMillis() + - getActiveMessageMetadata().get().stopwatch().elapsed().toMillis()) + + "\n"); + } + message.append("Processing times in each step(millis)\n"); + for (Map.Entry<String, IntSummaryStatistics> entry : + this.processingTimesByStep.entrySet()) { + message.append("Step name: " + entry.getKey() + "\n"); + message.append("Time spent in this step: " + entry.getValue().toString() + "\n"); + } + } + + if (trackedThread != null) { + message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace())); + } + return message.toString(); + } + @Override protected void takeSampleOnce(long millisSinceLastSample) { elementExecutionTracker.takeSample(millisSinceLastSample); super.takeSampleOnce(millisSinceLastSample); } + @Override + protected void reportBundleLull(Thread trackedThread, long millisElapsedSinceBundleStart) { + // If we're not logging warnings, nothing to report. + if (!LOG.isWarnEnabled()) { + return; + } + + Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart); + + // Since the lull reporting executes in the sampler thread, it won't automatically inherit the + // context of the current step. To ensure things are logged correctly, we get the currently + // registered DataflowWorkerLoggingHandler and log directly in the desired context. + LogRecord logRecord = + new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread, lullDuration)); + logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName()); + + // Publish directly in the context of this specific ExecutionState. + DataflowWorkerLoggingHandler dataflowLoggingHandler = + DataflowWorkerLoggingInitializer.getLoggingHandler(); + dataflowLoggingHandler.publish(logRecord); + } + /** * Enter a new state on the tracker. If the new state is a Dataflow processing state, tracks the * activeMessageMetadata with the start time of the new state. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java index b2ab928bc99..fc145aaf449 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java @@ -25,10 +25,8 @@ import com.google.api.services.dataflow.model.CounterStructuredName; import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; import com.google.api.services.dataflow.model.CounterUpdate; import java.io.Closeable; -import java.util.Map; import java.util.logging.Level; import java.util.logging.LogRecord; -import org.apache.beam.runners.core.SimpleDoFnRunner; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; @@ -42,7 +40,6 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationConte import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.format.PeriodFormatter; @@ -251,9 +248,6 @@ public class DataflowOperationContext implements OperationContext { return description.toString(); } - private static final ImmutableSet<String> FRAMEWORK_CLASSES = - ImmutableSet.of(SimpleDoFnRunner.class.getName(), DoFnInstanceManagers.class.getName()); - protected String getLullMessage(Thread trackedThread, Duration lullDuration) { StringBuilder message = new StringBuilder(); message.append("Operation ongoing"); @@ -272,7 +266,7 @@ public class DataflowOperationContext implements OperationContext { message.append("\n"); - message.append(getStackTraceForLullMessage(trackedThread.getStackTrace())); + message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace())); return message.toString(); } @@ -298,17 +292,7 @@ public class DataflowOperationContext implements OperationContext { dataflowLoggingHandler.publish(this, logRecord); if (shouldLogFullThreadDump(lullDuration)) { - Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces(); - for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) { - Thread thread = entry.getKey(); - StackTraceElement[] stackTrace = entry.getValue(); - StringBuilder message = new StringBuilder(); - message.append(thread.toString()).append(":\n"); - message.append(getStackTraceForLullMessage(stackTrace)); - logRecord = new LogRecord(Level.INFO, message.toString()); - logRecord.setLoggerName(DataflowOperationContext.LOG.getName()); - dataflowLoggingHandler.publish(this, logRecord); - } + StackTraceUtil.logAllStackTraces(); } } @@ -336,17 +320,6 @@ public class DataflowOperationContext implements OperationContext { return false; } - private String getStackTraceForLullMessage(StackTraceElement[] stackTrace) { - StringBuilder message = new StringBuilder(); - for (StackTraceElement e : stackTrace) { - if (FRAMEWORK_CLASSES.contains(e.getClassName())) { - break; - } - message.append(" at ").append(e).append("\n"); - } - return message.toString(); - } - public @Nullable MetricsContainer getMetricsContainer() { return metricsContainer; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java new file mode 100644 index 00000000000..041944f09cf --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import org.apache.beam.runners.core.SimpleDoFnRunner; +import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler; +import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility methods to print the stack traces of all the threads. */ +@Internal +public final class StackTraceUtil { + private static final ImmutableSet<String> FRAMEWORK_CLASSES = + ImmutableSet.of(SimpleDoFnRunner.class.getName(), DoFnInstanceManagers.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(StackTraceUtil.class); + + public static String getStackTraceForLullMessage(StackTraceElement[] stackTrace) { + StringBuilder message = new StringBuilder(); + for (StackTraceElement e : stackTrace) { + if (FRAMEWORK_CLASSES.contains(e.getClassName())) { + break; + } + message.append(" at ").append(e).append("\n"); + } + return message.toString(); + } + + public static void logAllStackTraces() { + DataflowWorkerLoggingHandler dataflowLoggingHandler = + DataflowWorkerLoggingInitializer.getLoggingHandler(); + Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces(); + for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) { + Thread thread = entry.getKey(); + StackTraceElement[] stackTrace = entry.getValue(); + StringBuilder message = new StringBuilder(); + message.append(thread.toString()).append(":\n"); + message.append(getStackTraceForLullMessage(stackTrace)); + LogRecord logRecord = new LogRecord(Level.INFO, message.toString()); + logRecord.setLoggerName(StackTraceUtil.LOG.getName()); + dataflowLoggingHandler.publish(logRecord); + } + } + + private StackTraceUtil() {} +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java index 551937c3559..7b9f1b1ab5d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java @@ -21,9 +21,17 @@ import static junit.framework.TestCase.assertNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.google.api.client.testing.http.FixedClock; +import com.google.api.client.util.Clock; import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import org.apache.beam.runners.core.SimpleDoFnRunner; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; @@ -35,28 +43,55 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDi import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.NameContext; +import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.RestoreSystemProperties; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.hamcrest.Matchers; import org.joda.time.DateTimeUtils.MillisProvider; +import org.joda.time.Duration; +import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** Tests for {@link DataflowExecutionStateTrackerTest}. */ public class DataflowExecutionStateTrackerTest { + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + + private File logFolder; private PipelineOptions options; private MillisProvider clock; private ExecutionStateSampler sampler; private CounterSet counterSet; @Before - public void setUp() { + public void setUp() throws IOException { options = PipelineOptionsFactory.create(); clock = mock(MillisProvider.class); sampler = ExecutionStateSampler.newForTest(clock); counterSet = new CounterSet(); + logFolder = tempFolder.newFolder(); + System.setProperty( + DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY, + new File(logFolder, "dataflow-json.log").getAbsolutePath()); + // We need to reset *first* because some other test may have already initialized the + // logging initializer. + DataflowWorkerLoggingInitializer.reset(); + DataflowWorkerLoggingInitializer.initialize(); + } + + @After + public void tearDown() { + DataflowWorkerLoggingInitializer.reset(); } private final NameContext step1 = @@ -67,7 +102,7 @@ public class DataflowExecutionStateTrackerTest { @Test public void testReportsElementExecutionTime() throws IOException { enableTimePerElementExperiment(); - ExecutionStateTracker tracker = createTracker(); + DataflowExecutionStateTracker tracker = createTracker(); try (Closeable c1 = tracker.activate(new Thread())) { try (Closeable c2 = tracker.enterState(step1Process)) {} @@ -84,7 +119,7 @@ public class DataflowExecutionStateTrackerTest { @Test public void testTakesSampleOnDeactivate() throws IOException { enableTimePerElementExperiment(); - ExecutionStateTracker tracker = createTracker(); + DataflowExecutionStateTracker tracker = createTracker(); try (Closeable c1 = tracker.activate(new Thread())) { try (Closeable c2 = tracker.enterState(step1Process)) { @@ -123,7 +158,7 @@ public class DataflowExecutionStateTrackerTest { .build())); } - private ExecutionStateTracker createTracker() { + private DataflowExecutionStateTracker createTracker() { return new DataflowExecutionStateTracker( sampler, new TestDataflowExecutionState(NameContext.forStage("test-stage"), "other"), @@ -131,4 +166,76 @@ public class DataflowExecutionStateTrackerTest { options, "test-work-item-id"); } + + @Test + public void testLullReportsRightTrace() throws Exception { + Thread mockThread = mock(Thread.class); + StackTraceElement[] doFnStackTrace = + new StackTraceElement[] { + new StackTraceElement( + "userpackage.SomeUserDoFn", "helperMethod", "SomeUserDoFn.java", 250), + new StackTraceElement("userpackage.SomeUserDoFn", "process", "SomeUserDoFn.java", 450), + new StackTraceElement( + SimpleDoFnRunner.class.getName(), "processElement", "SimpleDoFnRunner.java", 500), + }; + when(mockThread.getStackTrace()).thenReturn(doFnStackTrace); + FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis()); + DataflowExecutionStateTracker tracker = createTracker(clock); + tracker.reportBundleLull(mockThread, 30 * 60 * 1000); + verifyLullLog(); + + clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(5L).getMillis()); + tracker.reportBundleLull(mockThread, 30 * 60 * 1000); + verifyLullLog(); + + clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(16L).getMillis()); + tracker.reportBundleLull(mockThread, 6 * 60 * 1000); + verifyLullLog(); + + clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(16L).getMillis()); + tracker.reportBundleLull(mockThread, 30 * 60 * 1000); + verifyLullLog(); + } + + private void verifyLullLog() throws IOException { + File[] files = logFolder.listFiles(); + assertThat(files, Matchers.arrayWithSize(1)); + File logFile = files[0]; + List<String> lines = Files.readAllLines(logFile.toPath()); + + String warnLines = + Joiner.on("\n").join(Iterables.filter(lines, line -> line.contains("\"WARN\""))); + assertThat( + warnLines, + Matchers.allOf( + Matchers.containsString("Operation ongoing in bundle for at least"), + Matchers.containsString(" without completing"), + Matchers.containsString("Processing times in each step"), + Matchers.containsString( + "org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker"), + Matchers.containsString("userpackage.SomeUserDoFn.helperMethod"), + Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName())))); + + String infoLines = + Joiner.on("\n").join(Iterables.filter(lines, line -> line.contains("\"INFO\""))); + assertThat( + infoLines, + Matchers.not( + Matchers.anyOf( + Matchers.containsString("Thread[backgroundThread,"), + Matchers.containsString( + "org.apache.beam.runners.dataflow.worker.StackTraceUtil")))); + // Truncate the file when done to prepare for the next test. + new FileOutputStream(logFile, false).getChannel().truncate(0).close(); + } + + private DataflowExecutionStateTracker createTracker(Clock clock) { + return new DataflowExecutionStateTracker( + sampler, + new TestDataflowExecutionState(NameContext.forStage("test-stage"), "other"), + counterSet, + options, + "test-work-item-id", + clock); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java index 5c34441766c..1e6ec5f1a10 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java @@ -31,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; @@ -43,6 +44,7 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; @@ -139,19 +141,21 @@ public class MapTaskExecutorTest { Operation o3 = Mockito.mock(Operation.class); List<Operation> operations = Arrays.asList(new Operation[] {o1, o2, o3}); - ExecutionStateTracker stateTracker = Mockito.mock(ExecutionStateTracker.class); + ExecutionStateTracker stateTracker = Mockito.spy(ExecutionStateTracker.newForTest()); try (MapTaskExecutor executor = new MapTaskExecutor(operations, counterSet, stateTracker)) { executor.execute(); } InOrder inOrder = Mockito.inOrder(stateTracker, o1, o2, o3); + inOrder.verify(stateTracker).activate(); inOrder.verify(o3).start(); inOrder.verify(o2).start(); inOrder.verify(o1).start(); inOrder.verify(o1).finish(); inOrder.verify(o2).finish(); inOrder.verify(o3).finish(); + inOrder.verify(stateTracker).deactivate(); } private TestOperation createOperation(String stepName, long count) { @@ -338,6 +342,8 @@ public class MapTaskExecutorTest { assertThat( context3.metricsContainer().getUpdates().counterUpdates(), contains(metricUpdate("TestMetric", "MetricCounter", o3, 3L))); + assertEquals(0, stateTracker.getMillisSinceBundleStart()); + assertEquals(TimeUnit.MINUTES.toMillis(10), stateTracker.getNextBundleLullDurationReportMs()); } } @@ -401,13 +407,14 @@ public class MapTaskExecutorTest { Operation o3 = Mockito.mock(Operation.class); Mockito.doThrow(new Exception("in start")).when(o2).start(); - ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest(); + ExecutionStateTracker stateTracker = Mockito.spy(ExecutionStateTracker.newForTest()); try (MapTaskExecutor executor = new MapTaskExecutor(Arrays.<Operation>asList(o1, o2, o3), counterSet, stateTracker)) { executor.execute(); fail("Should have thrown"); } catch (Exception e) { - InOrder inOrder = Mockito.inOrder(o1, o2, o3); + InOrder inOrder = Mockito.inOrder(o1, o2, o3, stateTracker); + inOrder.verify(stateTracker).activate(); inOrder.verify(o3).start(); inOrder.verify(o2).start(); @@ -415,6 +422,7 @@ public class MapTaskExecutorTest { Mockito.verify(o1).abort(); Mockito.verify(o2).abort(); Mockito.verify(o3).abort(); + Mockito.verify(stateTracker).deactivate(); Mockito.verifyNoMoreInteractions(o1, o2, o3); } } @@ -426,12 +434,13 @@ public class MapTaskExecutorTest { Operation o3 = Mockito.mock(Operation.class); Mockito.doThrow(new Exception("in finish")).when(o2).finish(); - ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest(); + ExecutionStateTracker stateTracker = Mockito.spy(ExecutionStateTracker.newForTest()); try (MapTaskExecutor executor = new MapTaskExecutor(Arrays.<Operation>asList(o1, o2, o3), counterSet, stateTracker)) { executor.execute(); fail("Should have thrown"); } catch (Exception e) { + Mockito.verify(stateTracker).activate(); InOrder inOrder = Mockito.inOrder(o1, o2, o3); inOrder.verify(o3).start(); inOrder.verify(o2).start(); @@ -443,6 +452,7 @@ public class MapTaskExecutorTest { Mockito.verify(o1).abort(); Mockito.verify(o2).abort(); Mockito.verify(o3).abort(); + Mockito.verify(stateTracker).deactivate(); Mockito.verifyNoMoreInteractions(o1, o2, o3); } } @@ -456,13 +466,14 @@ public class MapTaskExecutorTest { Mockito.doThrow(new Exception("in finish")).when(o2).finish(); Mockito.doThrow(new Exception("suppressed in abort")).when(o3).abort(); - ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest(); + ExecutionStateTracker stateTracker = Mockito.spy(ExecutionStateTracker.newForTest()); try (MapTaskExecutor executor = new MapTaskExecutor(Arrays.<Operation>asList(o1, o2, o3, o4), counterSet, stateTracker)) { executor.execute(); fail("Should have thrown"); } catch (Exception e) { - InOrder inOrder = Mockito.inOrder(o1, o2, o3, o4); + Mockito.verify(stateTracker).activate(); + InOrder inOrder = Mockito.inOrder(o1, o2, o3, o4, stateTracker); inOrder.verify(o4).start(); inOrder.verify(o3).start(); inOrder.verify(o2).start(); @@ -475,6 +486,7 @@ public class MapTaskExecutorTest { Mockito.verify(o2).abort(); Mockito.verify(o3).abort(); // will throw an exception, but we shouldn't fail Mockito.verify(o4).abort(); + Mockito.verify(stateTracker).deactivate(); Mockito.verifyNoMoreInteractions(o1, o2, o3, o4); // Make sure the failure while aborting shows up as a suppressed error @@ -491,7 +503,7 @@ public class MapTaskExecutorTest { ReadOperation o1 = Mockito.mock(ReadOperation.class); ReadOperation o2 = Mockito.mock(ReadOperation.class); - ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest(); + ExecutionStateTracker stateTracker = Mockito.spy(ExecutionStateTracker.newForTest()); MapTaskExecutor executor = new MapTaskExecutor(Arrays.<Operation>asList(o1, o2), counterSet, stateTracker); Mockito.doAnswer( @@ -502,7 +514,9 @@ public class MapTaskExecutorTest { .when(o1) .finish(); executor.execute(); + Mockito.verify(stateTracker).activate(); Mockito.verify(o1, atLeastOnce()).abortReadLoop(); Mockito.verify(o2, atLeastOnce()).abortReadLoop(); + Mockito.verify(stateTracker).deactivate(); } }