This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch release-2.55.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.55.0 by this push: new 0dc330e6d53 Revert "Implementing lull reporting at bundle level processing (#29882)" (#30648) (#30664) 0dc330e6d53 is described below commit 0dc330e6d53cce51b13bcb652e80859c1b3a5975 Author: Yi Hu <ya...@google.com> AuthorDate: Mon Mar 18 17:29:24 2024 -0400 Revert "Implementing lull reporting at bundle level processing (#29882)" (#30648) (#30664) This reverts commit ffe2dba532028cdbbb5bca9c374f0a2d756ee8bf. Co-authored-by: Arvind Ram <arvindra...@gmail.com> --- .../core/metrics/ExecutionStateTracker.java | 25 ---- .../dataflow/worker/DataflowExecutionContext.java | 117 +---------------- .../dataflow/worker/DataflowOperationContext.java | 80 +++++++++++- .../runners/dataflow/worker/StackTraceUtil.java | 66 ---------- .../worker/DataflowExecutionStateTrackerTest.java | 140 +-------------------- .../worker/DataflowOperationContextTest.java | 80 ++++++++++-- 6 files changed, 155 insertions(+), 353 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java index b0b8f0107f3..dc6fd2f8248 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java @@ -46,7 +46,6 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> new ConcurrentHashMap<>(); private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5); - private static final long BUNDLE_LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(10); private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> SAMPLING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, "sampling"); @@ -140,17 +139,8 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> */ private volatile long millisSinceLastTransition = 0; - /** - * The number of milliseconds since the {@link ExecutionStateTracker} initial state. - * - * <p>This variable is updated by the Sampling thread, and read by the Progress Reporting thread, - * thus it being marked volatile. - */ - private volatile long millisSinceBundleStart = 0; - private long transitionsAtLastSample = 0; private long nextLullReportMs = LULL_REPORT_MS; - private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS; public ExecutionStateTracker(ExecutionStateSampler sampler) { this.sampler = sampler; @@ -165,10 +155,8 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> currentState = null; numTransitions = 0; millisSinceLastTransition = 0; - millisSinceBundleStart = 0; transitionsAtLastSample = 0; nextLullReportMs = LULL_REPORT_MS; - nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS; } @VisibleForTesting @@ -347,19 +335,6 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> transitionsAtLastSample = transitionsAtThisSample; } updateMillisSinceLastTransition(millisSinceLastSample, state); - updateMillisSinceBundleStart(millisSinceLastSample); - } - - // Override this to implement bundle level lull reporting. - protected void reportBundleLull(long millisSinceBundleStart) {} - - @SuppressWarnings("NonAtomicVolatileUpdate") - private void updateMillisSinceBundleStart(long millisSinceLastSample) { - millisSinceBundleStart += millisSinceLastSample; - if (millisSinceBundleStart > nextBundleLullReportMs) { - reportBundleLull(millisSinceBundleStart); - nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS; - } } @SuppressWarnings("NonAtomicVolatileUpdate") diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 16ff2975b02..080fa7c9dac 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; -import com.google.api.client.util.Clock; import com.google.api.services.dataflow.model.SideInputInfo; import java.io.Closeable; import java.io.IOException; @@ -30,8 +29,6 @@ import java.util.IntSummaryStatistics; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.logging.Level; -import java.util.logging.LogRecord; import java.util.stream.Collectors; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.NullSideInputReader; @@ -40,13 +37,10 @@ import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; -import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.NameContext; -import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler; -import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.MetricsContainer; @@ -54,16 +48,11 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer; import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; +import org.joda.time.DateTimeUtils.MillisProvider; import org.joda.time.Instant; -import org.joda.time.format.PeriodFormatter; -import org.joda.time.format.PeriodFormatterBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Execution context for the Dataflow worker. */ @SuppressWarnings({ @@ -271,59 +260,23 @@ public abstract class DataflowExecutionContext<T extends DataflowStepContext> { @Nullable private ActiveMessageMetadata activeMessageMetadata = null; - /** Clock used to either provide real system time or mocked to virtualize time for testing. */ - private final Clock clock; + private final MillisProvider clock = System::currentTimeMillis; @GuardedBy("this") private final Map<String, IntSummaryStatistics> processingTimesByStep = new HashMap<>(); - /** Last milliseconds since epoch when a full thread dump was performed. */ - private long lastFullThreadDumpMillis = 0; - - /** The minimum lull duration in milliseconds to perform a full thread dump. */ - private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 1000; - - private static final Logger LOG = LoggerFactory.getLogger(DataflowExecutionStateTracker.class); - - private static final PeriodFormatter DURATION_FORMATTER = - new PeriodFormatterBuilder() - .appendDays() - .appendSuffix("d") - .minimumPrintedDigits(2) - .appendHours() - .appendSuffix("h") - .printZeroAlways() - .appendMinutes() - .appendSuffix("m") - .appendSeconds() - .appendSuffix("s") - .toFormatter(); - public DataflowExecutionStateTracker( ExecutionStateSampler sampler, DataflowOperationContext.DataflowExecutionState otherState, CounterFactory counterFactory, PipelineOptions options, String workItemId) { - this(sampler, otherState, counterFactory, options, workItemId, Clock.SYSTEM); - } - - @VisibleForTesting - public DataflowExecutionStateTracker( - ExecutionStateSampler sampler, - DataflowOperationContext.DataflowExecutionState otherState, - CounterFactory counterFactory, - PipelineOptions options, - String workItemId, - Clock clock) { super(sampler); this.elementExecutionTracker = DataflowElementExecutionTracker.create(counterFactory, options); this.otherState = otherState; this.workItemId = workItemId; this.contextActivationObserverRegistry = ContextActivationObserverRegistry.createDefault(); - this.clock = clock; - DataflowWorkerLoggingInitializer.initialize(); } @Override @@ -348,76 +301,12 @@ public abstract class DataflowExecutionContext<T extends DataflowStepContext> { } } - private boolean shouldLogFullThreadDumpForBundle(Duration lullDuration) { - if (lullDuration.getMillis() < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS) { - return false; - } - long now = clock.currentTimeMillis(); - if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS < now) { - lastFullThreadDumpMillis = now; - return true; - } - return false; - } - - private String getBundleLullMessage(Duration lullDuration) { - StringBuilder message = new StringBuilder(); - message - .append("Operation ongoing in bundle for at least ") - .append(DURATION_FORMATTER.print(lullDuration.toPeriod())) - .append(" without completing") - .append("\n"); - synchronized (this) { - if (this.activeMessageMetadata != null) { - message.append( - "Current user step name: " + getActiveMessageMetadata().get().userStepName() + "\n"); - message.append( - "Time spent in this step(millis): " - + (clock.currentTimeMillis() - getActiveMessageMetadata().get().startTime()) - + "\n"); - } - message.append("Processing times in each step(millis)\n"); - for (Map.Entry<String, IntSummaryStatistics> entry : - this.processingTimesByStep.entrySet()) { - message.append("Step name: " + entry.getKey() + "\n"); - message.append("Time spent in this step: " + entry.getValue().toString() + "\n"); - } - } - - return message.toString(); - } - @Override protected void takeSampleOnce(long millisSinceLastSample) { elementExecutionTracker.takeSample(millisSinceLastSample); super.takeSampleOnce(millisSinceLastSample); } - @Override - protected void reportBundleLull(long millisElapsedSinceBundleStart) { - // If we're not logging warnings, nothing to report. - if (!LOG.isWarnEnabled()) { - return; - } - - Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart); - - // Since the lull reporting executes in the sampler thread, it won't automatically inherit the - // context of the current step. To ensure things are logged correctly, we get the currently - // registered DataflowWorkerLoggingHandler and log directly in the desired context. - LogRecord logRecord = new LogRecord(Level.WARNING, getBundleLullMessage(lullDuration)); - logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName()); - - // Publish directly in the context of this specific ExecutionState. - DataflowWorkerLoggingHandler dataflowLoggingHandler = - DataflowWorkerLoggingInitializer.getLoggingHandler(); - dataflowLoggingHandler.publish(logRecord); - - if (shouldLogFullThreadDumpForBundle(lullDuration)) { - StackTraceUtil.logAllStackTraces(); - } - } - /** * Enter a new state on the tracker. If the new state is a Dataflow processing state, tracks the * activeMessageMetadata with the start time of the new state. @@ -434,7 +323,7 @@ public abstract class DataflowExecutionContext<T extends DataflowStepContext> { synchronized (this) { this.activeMessageMetadata = ActiveMessageMetadata.create( - newDFState.getStepName().userName(), clock.currentTimeMillis()); + newDFState.getStepName().userName(), clock.getMillis()); } } elementExecutionTracker.enter(newDFState.getStepName()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java index 7d90a512519..b2ab928bc99 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java @@ -19,13 +19,16 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt; +import com.google.api.client.util.Clock; import com.google.api.services.dataflow.model.CounterMetadata; import com.google.api.services.dataflow.model.CounterStructuredName; import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; import com.google.api.services.dataflow.model.CounterUpdate; import java.io.Closeable; +import java.util.Map; import java.util.logging.Level; import java.util.logging.LogRecord; +import org.apache.beam.runners.core.SimpleDoFnRunner; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; @@ -39,6 +42,7 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationConte import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.format.PeriodFormatter; @@ -181,6 +185,9 @@ public class DataflowOperationContext implements OperationContext { private final ProfileScope profileScope; private final @Nullable MetricsContainer metricsContainer; + /** Clock used to either provide real system time or mocked to virtualize time for testing. */ + private final Clock clock; + public DataflowExecutionState( NameContext nameContext, String stateName, @@ -188,12 +195,31 @@ public class DataflowOperationContext implements OperationContext { @Nullable Integer inputIndex, @Nullable MetricsContainer metricsContainer, ProfileScope profileScope) { + this( + nameContext, + stateName, + requestingStepName, + inputIndex, + metricsContainer, + profileScope, + Clock.SYSTEM); + } + + public DataflowExecutionState( + NameContext nameContext, + String stateName, + @Nullable String requestingStepName, + @Nullable Integer inputIndex, + @Nullable MetricsContainer metricsContainer, + ProfileScope profileScope, + Clock clock) { super(stateName); this.stepName = nameContext; this.requestingStepName = requestingStepName; this.inputIndex = inputIndex; this.profileScope = Preconditions.checkNotNull(profileScope); this.metricsContainer = metricsContainer; + this.clock = clock; } /** @@ -225,6 +251,9 @@ public class DataflowOperationContext implements OperationContext { return description.toString(); } + private static final ImmutableSet<String> FRAMEWORK_CLASSES = + ImmutableSet.of(SimpleDoFnRunner.class.getName(), DoFnInstanceManagers.class.getName()); + protected String getLullMessage(Thread trackedThread, Duration lullDuration) { StringBuilder message = new StringBuilder(); message.append("Operation ongoing"); @@ -243,7 +272,7 @@ public class DataflowOperationContext implements OperationContext { message.append("\n"); - message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace())); + message.append(getStackTraceForLullMessage(trackedThread.getStackTrace())); return message.toString(); } @@ -267,6 +296,55 @@ public class DataflowOperationContext implements OperationContext { DataflowWorkerLoggingHandler dataflowLoggingHandler = DataflowWorkerLoggingInitializer.getLoggingHandler(); dataflowLoggingHandler.publish(this, logRecord); + + if (shouldLogFullThreadDump(lullDuration)) { + Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces(); + for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) { + Thread thread = entry.getKey(); + StackTraceElement[] stackTrace = entry.getValue(); + StringBuilder message = new StringBuilder(); + message.append(thread.toString()).append(":\n"); + message.append(getStackTraceForLullMessage(stackTrace)); + logRecord = new LogRecord(Level.INFO, message.toString()); + logRecord.setLoggerName(DataflowOperationContext.LOG.getName()); + dataflowLoggingHandler.publish(this, logRecord); + } + } + } + + /** + * The time interval between two full thread dump. (A full thread dump is performed at most once + * every 20 minutes.) + */ + private static final long LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS = 20 * 60 * 1000; + + /** The minimum lull duration to perform a full thread dump. */ + private static final long LOG_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 1000; + + /** Last time when a full thread dump was performed. */ + private long lastFullThreadDumpMillis = 0; + + private boolean shouldLogFullThreadDump(Duration lullDuration) { + if (lullDuration.getMillis() < LOG_LULL_FULL_THREAD_DUMP_LULL_MS) { + return false; + } + long now = clock.currentTimeMillis(); + if (lastFullThreadDumpMillis + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS < now) { + lastFullThreadDumpMillis = now; + return true; + } + return false; + } + + private String getStackTraceForLullMessage(StackTraceElement[] stackTrace) { + StringBuilder message = new StringBuilder(); + for (StackTraceElement e : stackTrace) { + if (FRAMEWORK_CLASSES.contains(e.getClassName())) { + break; + } + message.append(" at ").append(e).append("\n"); + } + return message.toString(); } public @Nullable MetricsContainer getMetricsContainer() { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java deleted file mode 100644 index 041944f09cf..00000000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker; - -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.LogRecord; -import org.apache.beam.runners.core.SimpleDoFnRunner; -import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler; -import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Utility methods to print the stack traces of all the threads. */ -@Internal -public final class StackTraceUtil { - private static final ImmutableSet<String> FRAMEWORK_CLASSES = - ImmutableSet.of(SimpleDoFnRunner.class.getName(), DoFnInstanceManagers.class.getName()); - private static final Logger LOG = LoggerFactory.getLogger(StackTraceUtil.class); - - public static String getStackTraceForLullMessage(StackTraceElement[] stackTrace) { - StringBuilder message = new StringBuilder(); - for (StackTraceElement e : stackTrace) { - if (FRAMEWORK_CLASSES.contains(e.getClassName())) { - break; - } - message.append(" at ").append(e).append("\n"); - } - return message.toString(); - } - - public static void logAllStackTraces() { - DataflowWorkerLoggingHandler dataflowLoggingHandler = - DataflowWorkerLoggingInitializer.getLoggingHandler(); - Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces(); - for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) { - Thread thread = entry.getKey(); - StackTraceElement[] stackTrace = entry.getValue(); - StringBuilder message = new StringBuilder(); - message.append(thread.toString()).append(":\n"); - message.append(getStackTraceForLullMessage(stackTrace)); - LogRecord logRecord = new LogRecord(Level.INFO, message.toString()); - logRecord.setLoggerName(StackTraceUtil.LOG.getName()); - dataflowLoggingHandler.publish(logRecord); - } - } - - private StackTraceUtil() {} -} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java index 3502b621147..551937c3559 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java @@ -22,14 +22,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; -import com.google.api.client.testing.http.FixedClock; -import com.google.api.client.util.Clock; import java.io.Closeable; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.nio.file.Files; -import java.util.List; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; @@ -41,55 +35,28 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDi import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.NameContext; -import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.RestoreSystemProperties; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.hamcrest.Matchers; import org.joda.time.DateTimeUtils.MillisProvider; -import org.joda.time.Duration; -import org.junit.After; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; /** Tests for {@link DataflowExecutionStateTrackerTest}. */ public class DataflowExecutionStateTrackerTest { - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Rule public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); - - private File logFolder; private PipelineOptions options; private MillisProvider clock; private ExecutionStateSampler sampler; private CounterSet counterSet; @Before - public void setUp() throws IOException { + public void setUp() { options = PipelineOptionsFactory.create(); clock = mock(MillisProvider.class); sampler = ExecutionStateSampler.newForTest(clock); counterSet = new CounterSet(); - logFolder = tempFolder.newFolder(); - System.setProperty( - DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY, - new File(logFolder, "dataflow-json.log").getAbsolutePath()); - // We need to reset *first* because some other test may have already initialized the - // logging initializer. - DataflowWorkerLoggingInitializer.reset(); - DataflowWorkerLoggingInitializer.initialize(); - } - - @After - public void tearDown() { - DataflowWorkerLoggingInitializer.reset(); } private final NameContext step1 = @@ -100,7 +67,7 @@ public class DataflowExecutionStateTrackerTest { @Test public void testReportsElementExecutionTime() throws IOException { enableTimePerElementExperiment(); - DataflowExecutionStateTracker tracker = createTracker(); + ExecutionStateTracker tracker = createTracker(); try (Closeable c1 = tracker.activate(new Thread())) { try (Closeable c2 = tracker.enterState(step1Process)) {} @@ -117,7 +84,7 @@ public class DataflowExecutionStateTrackerTest { @Test public void testTakesSampleOnDeactivate() throws IOException { enableTimePerElementExperiment(); - DataflowExecutionStateTracker tracker = createTracker(); + ExecutionStateTracker tracker = createTracker(); try (Closeable c1 = tracker.activate(new Thread())) { try (Closeable c2 = tracker.enterState(step1Process)) { @@ -156,7 +123,7 @@ public class DataflowExecutionStateTrackerTest { .build())); } - private DataflowExecutionStateTracker createTracker() { + private ExecutionStateTracker createTracker() { return new DataflowExecutionStateTracker( sampler, new TestDataflowExecutionState(NameContext.forStage("test-stage"), "other"), @@ -164,103 +131,4 @@ public class DataflowExecutionStateTrackerTest { options, "test-work-item-id"); } - - @Test - public void testLullReportsRightTrace() throws Exception { - FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis()); - DataflowExecutionStateTracker tracker = createTracker(clock); - // Adding test for the full thread dump, but since we can't mock - // Thread.getAllStackTraces(), we are starting a background thread - // to verify the full thread dump. - Thread backgroundThread = - new Thread("backgroundThread") { - @Override - public void run() { - try { - Thread.sleep(Long.MAX_VALUE); - } catch (InterruptedException e) { - // exiting the thread - } - } - }; - - backgroundThread.start(); - try { - // Full thread dump should be performed, because we never performed - // a full thread dump before, and the lull duration is more than 20 - // minutes. - tracker.reportBundleLull(30 * 60 * 1000); - verifyLullLog(true); - - // Full thread dump should not be performed because the last dump - // was only 5 minutes ago. - clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(5L).getMillis()); - tracker.reportBundleLull(30 * 60 * 1000); - verifyLullLog(false); - - // Full thread dump should not be performed because the lull duration - // is only 6 minutes. - clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(16L).getMillis()); - tracker.reportBundleLull(6 * 60 * 1000); - verifyLullLog(false); - - // Full thread dump should be performed, because it has been 21 minutes - // since the last dump, and the lull duration is more than 20 minutes. - clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(16L).getMillis()); - tracker.reportBundleLull(30 * 60 * 1000); - // verifyLullLog(true); - } finally { - // Cleaning up the background thread. - backgroundThread.interrupt(); - backgroundThread.join(); - } - } - - private void verifyLullLog(boolean hasFullThreadDump) throws IOException { - File[] files = logFolder.listFiles(); - assertThat(files, Matchers.arrayWithSize(1)); - File logFile = files[0]; - List<String> lines = Files.readAllLines(logFile.toPath()); - - String warnLines = - Joiner.on("\n").join(Iterables.filter(lines, line -> line.contains("\"WARN\""))); - assertThat( - warnLines, - Matchers.allOf( - Matchers.containsString("Operation ongoing in bundle for at least"), - Matchers.containsString(" without completing"), - Matchers.containsString("Processing times in each step"), - Matchers.containsString( - "org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker"))); - - String infoLines = - Joiner.on("\n").join(Iterables.filter(lines, line -> line.contains("\"INFO\""))); - if (hasFullThreadDump) { - assertThat( - infoLines, - Matchers.allOf( - Matchers.containsString("Thread[backgroundThread,"), - Matchers.containsString("org.apache.beam.runners.dataflow.worker.StackTraceUtil"))); - } else { - assertThat( - infoLines, - Matchers.not( - Matchers.anyOf( - Matchers.containsString("Thread[backgroundThread,"), - Matchers.containsString( - "org.apache.beam.runners.dataflow.worker.StackTraceUtil")))); - } - // Truncate the file when done to prepare for the next test. - new FileOutputStream(logFile, false).getChannel().truncate(0).close(); - } - - private DataflowExecutionStateTracker createTracker(Clock clock) { - return new DataflowExecutionStateTracker( - sampler, - new TestDataflowExecutionState(NameContext.forStage("test-stage"), "other"), - counterSet, - options, - "test-work-item-id", - clock); - } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java index 88a9f8e11c7..34c3b3d5373 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java @@ -29,6 +29,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.client.testing.http.FixedClock; +import com.google.api.client.util.Clock; import com.google.api.services.dataflow.model.CounterUpdate; import java.io.Closeable; import java.io.File; @@ -204,6 +206,7 @@ public class DataflowOperationContextTest { @Test public void testLullReportsRightTrace() throws Exception { Thread mockThread = mock(Thread.class); + FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis()); DataflowExecutionState executionState = new DataflowExecutionState( @@ -212,7 +215,8 @@ public class DataflowOperationContextTest { null /* requestingStepName */, null /* inputIndex */, null /* metricsContainer */, - ScopedProfiler.INSTANCE.emptyScope()) { + ScopedProfiler.INSTANCE.emptyScope(), + clock) { @Override public @Nullable CounterUpdate extractUpdate(boolean isFinalUpdate) { // not being used for extracting updates @@ -234,11 +238,55 @@ public class DataflowOperationContextTest { SimpleDoFnRunner.class.getName(), "processElement", "SimpleDoFnRunner.java", 500), }; when(mockThread.getStackTrace()).thenReturn(doFnStackTrace); - executionState.reportLull(mockThread, 30 * 60 * 1000); - verifyLullLog(); + + // Adding test for the full thread dump, but since we can't mock + // Thread.getAllStackTraces(), we are starting a background thread + // to verify the full thread dump. + Thread backgroundThread = + new Thread("backgroundThread") { + @Override + public void run() { + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + // exiting the thread + } + } + }; + + backgroundThread.start(); + try { + // Full thread dump should be performed, because we never performed + // a full thread dump before, and the lull duration is more than 20 + // minutes. + executionState.reportLull(mockThread, 30 * 60 * 1000); + verifyLullLog(true); + + // Full thread dump should not be performed because the last dump + // was only 5 minutes ago. + clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(5L).getMillis()); + executionState.reportLull(mockThread, 30 * 60 * 1000); + verifyLullLog(false); + + // Full thread dump should not be performed because the lull duration + // is only 6 minutes. + clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(16L).getMillis()); + executionState.reportLull(mockThread, 6 * 60 * 1000); + verifyLullLog(false); + + // Full thread dump should be performed, because it has been 21 minutes + // since the last dump, and the lull duration is more than 20 minutes. + clock.setTime(clock.currentTimeMillis() + Duration.standardMinutes(16L).getMillis()); + executionState.reportLull(mockThread, 30 * 60 * 1000); + verifyLullLog(true); + } finally { + // Cleaning up the background thread. + backgroundThread.interrupt(); + backgroundThread.join(); + } } - private void verifyLullLog() throws IOException { + private void verifyLullLog(boolean hasFullThreadDump) throws IOException { File[] files = logFolder.listFiles(); assertThat(files, Matchers.arrayWithSize(1)); File logFile = files[0]; @@ -257,13 +305,23 @@ public class DataflowOperationContextTest { String infoLines = Joiner.on("\n").join(Iterables.filter(lines, line -> line.contains("\"INFO\""))); - assertThat( - infoLines, - Matchers.not( - Matchers.anyOf( - Matchers.containsString("Thread[backgroundThread,"), - Matchers.containsString( - "org.apache.beam.runners.dataflow.worker.DataflowOperationContext")))); + if (hasFullThreadDump) { + assertThat( + infoLines, + Matchers.allOf( + Matchers.containsString("Thread[backgroundThread,"), + Matchers.containsString( + "org.apache.beam.runners.dataflow.worker.DataflowOperationContext"), + Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName())))); + } else { + assertThat( + infoLines, + Matchers.not( + Matchers.anyOf( + Matchers.containsString("Thread[backgroundThread,"), + Matchers.containsString( + "org.apache.beam.runners.dataflow.worker.DataflowOperationContext")))); + } // Truncate the file when done to prepare for the next test. new FileOutputStream(logFile, false).getChannel().truncate(0).close(); }