This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 61ca405d372 Implementing lull reporting at bundle level processing 
(#30693)
61ca405d372 is described below

commit 61ca405d372b02c985f641e96d10e79f61400f4c
Author: Arvind Ram <arvindra...@gmail.com>
AuthorDate: Mon Apr 29 06:15:45 2024 -0700

    Implementing lull reporting at bundle level processing (#30693)
    
    This reverts commit 50c59912bc002947c335170b42827b278b78aae1.
    * reset lull state cleanup during deactivate
---
 .../core/metrics/ExecutionStateTracker.java        |  41 +++++++-
 .../dataflow/worker/DataflowExecutionContext.java  |  96 +++++++++++++++++
 .../dataflow/worker/DataflowOperationContext.java  |  31 +-----
 .../runners/dataflow/worker/StackTraceUtil.java    |  66 ++++++++++++
 .../worker/DataflowExecutionStateTrackerTest.java  | 115 ++++++++++++++++++++-
 .../util/common/worker/MapTaskExecutorTest.java    |  28 +++--
 6 files changed, 336 insertions(+), 41 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index dc6fd2f8248..f70e9ac16f9 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -46,6 +46,7 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
       new ConcurrentHashMap<>();
 
   private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final long BUNDLE_LULL_REPORT_MS = 
TimeUnit.MINUTES.toMillis(10);
   private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> 
SAMPLING_UPDATER =
       AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, 
"sampling");
 
@@ -139,8 +140,17 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
    */
   private volatile long millisSinceLastTransition = 0;
 
+  /**
+   * The number of milliseconds since the {@link ExecutionStateTracker} 
initial state.
+   *
+   * <p>This variable is updated by the Sampling thread, and read by the 
Progress Reporting thread,
+   * thus it being marked volatile.
+   */
+  private volatile long millisSinceBundleStart = 0;
+
   private long transitionsAtLastSample = 0;
   private long nextLullReportMs = LULL_REPORT_MS;
+  private long nextBundleLullDurationReportMs = BUNDLE_LULL_REPORT_MS;
 
   public ExecutionStateTracker(ExecutionStateSampler sampler) {
     this.sampler = sampler;
@@ -239,13 +249,16 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
     return trackedThread;
   }
 
-  private synchronized void deactivate() {
+  @VisibleForTesting
+  public synchronized void deactivate() {
     sampler.removeTracker(this);
     Thread thread = this.trackedThread;
     if (thread != null) {
       CURRENT_TRACKERS.remove(thread.getId());
     }
     this.trackedThread = null;
+    millisSinceBundleStart = 0;
+    nextBundleLullDurationReportMs = BUNDLE_LULL_REPORT_MS;
   }
 
   public ExecutionState getCurrentState() {
@@ -294,6 +307,11 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
     return millisSinceLastTransition;
   }
 
+  /** Return the time since the last transition. */
+  public long getMillisSinceBundleStart() {
+    return millisSinceBundleStart;
+  }
+
   /** Return the number of transitions since the last sample. */
   public long getTransitionsAtLastSample() {
     return transitionsAtLastSample;
@@ -304,6 +322,12 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
     return nextLullReportMs;
   }
 
+  /** Return the duration since bundle start for the next bundle lull report. 
*/
+  @VisibleForTesting
+  public long getNextBundleLullDurationReportMs() {
+    return nextBundleLullDurationReportMs;
+  }
+
   /**
    * Called periodically by the {@link ExecutionStateSampler} to report time 
recorded by the
    * tracker.
@@ -335,6 +359,21 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
       transitionsAtLastSample = transitionsAtThisSample;
     }
     updateMillisSinceLastTransition(millisSinceLastSample, state);
+    updateMillisSinceBundleStart(millisSinceLastSample);
+  }
+
+  // Override this to implement bundle level lull reporting.
+  protected void reportBundleLull(Thread trackedThread, long 
millisSinceBundleStart) {}
+
+  // This suppression doesn't cause any race condition because it is updated 
by only one thread
+  // which is currently tracked.
+  @SuppressWarnings("NonAtomicVolatileUpdate")
+  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
+    millisSinceBundleStart += millisSinceLastSample;
+    if (millisSinceBundleStart > nextBundleLullDurationReportMs) {
+      reportBundleLull(trackedThread, millisSinceBundleStart);
+      nextBundleLullDurationReportMs += BUNDLE_LULL_REPORT_MS;
+    }
   }
 
   @SuppressWarnings("NonAtomicVolatileUpdate")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 71c158b5da2..2b4c7df6ace 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.SideInputInfo;
 import java.io.Closeable;
 import java.io.IOException;
@@ -29,6 +30,8 @@ import java.util.IntSummaryStatistics;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
 import java.util.stream.Collectors;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.NullSideInputReader;
@@ -43,6 +46,8 @@ import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.Dataflow
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -50,11 +55,17 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Execution context for the Dataflow worker. */
 @SuppressWarnings({
@@ -264,21 +275,53 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
     @Nullable
     private ActiveMessageMetadata activeMessageMetadata = null;
 
+    /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
+    private final Clock clock;
+
     @GuardedBy("this")
     private final Map<String, IntSummaryStatistics> processingTimesByStep = 
new HashMap<>();
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataflowExecutionStateTracker.class);
+
+    private static final PeriodFormatter DURATION_FORMATTER =
+        new PeriodFormatterBuilder()
+            .appendDays()
+            .appendSuffix("d")
+            .minimumPrintedDigits(2)
+            .appendHours()
+            .appendSuffix("h")
+            .printZeroAlways()
+            .appendMinutes()
+            .appendSuffix("m")
+            .appendSeconds()
+            .appendSuffix("s")
+            .toFormatter();
+
     public DataflowExecutionStateTracker(
         ExecutionStateSampler sampler,
         DataflowOperationContext.DataflowExecutionState otherState,
         CounterFactory counterFactory,
         PipelineOptions options,
         String workItemId) {
+      this(sampler, otherState, counterFactory, options, workItemId, 
Clock.SYSTEM);
+    }
+
+    @VisibleForTesting
+    public DataflowExecutionStateTracker(
+        ExecutionStateSampler sampler,
+        DataflowOperationContext.DataflowExecutionState otherState,
+        CounterFactory counterFactory,
+        PipelineOptions options,
+        String workItemId,
+        Clock clock) {
       super(sampler);
       this.elementExecutionTracker =
           DataflowElementExecutionTracker.create(counterFactory, options);
       this.otherState = otherState;
       this.workItemId = workItemId;
       this.contextActivationObserverRegistry = 
ContextActivationObserverRegistry.createDefault();
+      this.clock = clock;
+      DataflowWorkerLoggingInitializer.initialize();
       if (options instanceof DataflowPipelineOptions) {
         this.isStreaming = ((DataflowPipelineOptions) options).isStreaming();
       } else {
@@ -308,12 +351,65 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
       }
     }
 
+    private String getBundleLullMessage(Thread trackedThread, Duration 
lullDuration) {
+      StringBuilder message = new StringBuilder();
+      message
+          .append("Operation ongoing in bundle for at least ")
+          .append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
+          .append(" without completing")
+          .append("\n");
+      synchronized (this) {
+        if (this.activeMessageMetadata != null) {
+          message.append(
+              "Current user step name: " + 
getActiveMessageMetadata().get().userStepName() + "\n");
+          message.append(
+              "Time spent in this step(millis): "
+                  + (clock.currentTimeMillis()
+                      - 
getActiveMessageMetadata().get().stopwatch().elapsed().toMillis())
+                  + "\n");
+        }
+        message.append("Processing times in each step(millis)\n");
+        for (Map.Entry<String, IntSummaryStatistics> entry :
+            this.processingTimesByStep.entrySet()) {
+          message.append("Step name: " + entry.getKey() + "\n");
+          message.append("Time spent in this step: " + 
entry.getValue().toString() + "\n");
+        }
+      }
+
+      if (trackedThread != null) {
+        
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
+      }
+      return message.toString();
+    }
+
     @Override
     protected void takeSampleOnce(long millisSinceLastSample) {
       elementExecutionTracker.takeSample(millisSinceLastSample);
       super.takeSampleOnce(millisSinceLastSample);
     }
 
+    @Override
+    protected void reportBundleLull(Thread trackedThread, long 
millisElapsedSinceBundleStart) {
+      // If we're not logging warnings, nothing to report.
+      if (!LOG.isWarnEnabled()) {
+        return;
+      }
+
+      Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);
+
+      // Since the lull reporting executes in the sampler thread, it won't 
automatically inherit the
+      // context of the current step. To ensure things are logged correctly, 
we get the currently
+      // registered DataflowWorkerLoggingHandler and log directly in the 
desired context.
+      LogRecord logRecord =
+          new LogRecord(Level.WARNING, getBundleLullMessage(trackedThread, 
lullDuration));
+      logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());
+
+      // Publish directly in the context of this specific ExecutionState.
+      DataflowWorkerLoggingHandler dataflowLoggingHandler =
+          DataflowWorkerLoggingInitializer.getLoggingHandler();
+      dataflowLoggingHandler.publish(logRecord);
+    }
+
     /**
      * Enter a new state on the tracker. If the new state is a Dataflow 
processing state, tracks the
      * activeMessageMetadata with the start time of the new state.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
index b2ab928bc99..fc145aaf449 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
@@ -25,10 +25,8 @@ import 
com.google.api.services.dataflow.model.CounterStructuredName;
 import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import java.io.Closeable;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
-import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
@@ -42,7 +40,6 @@ import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationConte
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.format.PeriodFormatter;
@@ -251,9 +248,6 @@ public class DataflowOperationContext implements 
OperationContext {
       return description.toString();
     }
 
-    private static final ImmutableSet<String> FRAMEWORK_CLASSES =
-        ImmutableSet.of(SimpleDoFnRunner.class.getName(), 
DoFnInstanceManagers.class.getName());
-
     protected String getLullMessage(Thread trackedThread, Duration 
lullDuration) {
       StringBuilder message = new StringBuilder();
       message.append("Operation ongoing");
@@ -272,7 +266,7 @@ public class DataflowOperationContext implements 
OperationContext {
 
       message.append("\n");
 
-      
message.append(getStackTraceForLullMessage(trackedThread.getStackTrace()));
+      
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
       return message.toString();
     }
 
@@ -298,17 +292,7 @@ public class DataflowOperationContext implements 
OperationContext {
       dataflowLoggingHandler.publish(this, logRecord);
 
       if (shouldLogFullThreadDump(lullDuration)) {
-        Map<Thread, StackTraceElement[]> threadSet = 
Thread.getAllStackTraces();
-        for (Map.Entry<Thread, StackTraceElement[]> entry : 
threadSet.entrySet()) {
-          Thread thread = entry.getKey();
-          StackTraceElement[] stackTrace = entry.getValue();
-          StringBuilder message = new StringBuilder();
-          message.append(thread.toString()).append(":\n");
-          message.append(getStackTraceForLullMessage(stackTrace));
-          logRecord = new LogRecord(Level.INFO, message.toString());
-          logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
-          dataflowLoggingHandler.publish(this, logRecord);
-        }
+        StackTraceUtil.logAllStackTraces();
       }
     }
 
@@ -336,17 +320,6 @@ public class DataflowOperationContext implements 
OperationContext {
       return false;
     }
 
-    private String getStackTraceForLullMessage(StackTraceElement[] stackTrace) 
{
-      StringBuilder message = new StringBuilder();
-      for (StackTraceElement e : stackTrace) {
-        if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
-          break;
-        }
-        message.append("  at ").append(e).append("\n");
-      }
-      return message.toString();
-    }
-
     public @Nullable MetricsContainer getMetricsContainer() {
       return metricsContainer;
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
new file mode 100644
index 00000000000..041944f09cf
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utility methods to print the stack traces of all the threads. */
+@Internal
+public final class StackTraceUtil {
+  private static final ImmutableSet<String> FRAMEWORK_CLASSES =
+      ImmutableSet.of(SimpleDoFnRunner.class.getName(), 
DoFnInstanceManagers.class.getName());
+  private static final Logger LOG = 
LoggerFactory.getLogger(StackTraceUtil.class);
+
+  public static String getStackTraceForLullMessage(StackTraceElement[] 
stackTrace) {
+    StringBuilder message = new StringBuilder();
+    for (StackTraceElement e : stackTrace) {
+      if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
+        break;
+      }
+      message.append("  at ").append(e).append("\n");
+    }
+    return message.toString();
+  }
+
+  public static void logAllStackTraces() {
+    DataflowWorkerLoggingHandler dataflowLoggingHandler =
+        DataflowWorkerLoggingInitializer.getLoggingHandler();
+    Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces();
+    for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) {
+      Thread thread = entry.getKey();
+      StackTraceElement[] stackTrace = entry.getValue();
+      StringBuilder message = new StringBuilder();
+      message.append(thread.toString()).append(":\n");
+      message.append(getStackTraceForLullMessage(stackTrace));
+      LogRecord logRecord = new LogRecord(Level.INFO, message.toString());
+      logRecord.setLoggerName(StackTraceUtil.LOG.getName());
+      dataflowLoggingHandler.publish(logRecord);
+    }
+  }
+
+  private StackTraceUtil() {}
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
index 551937c3559..7b9f1b1ab5d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
@@ -21,9 +21,17 @@ import static junit.framework.TestCase.assertNotNull;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import com.google.api.client.testing.http.FixedClock;
+import com.google.api.client.util.Clock;
 import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -35,28 +43,55 @@ import 
org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDi
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.hamcrest.Matchers;
 import org.joda.time.DateTimeUtils.MillisProvider;
+import org.joda.time.Duration;
+import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /** Tests for {@link DataflowExecutionStateTrackerTest}. */
 public class DataflowExecutionStateTrackerTest {
 
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Rule public RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
+  private File logFolder;
   private PipelineOptions options;
   private MillisProvider clock;
   private ExecutionStateSampler sampler;
   private CounterSet counterSet;
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     options = PipelineOptionsFactory.create();
     clock = mock(MillisProvider.class);
     sampler = ExecutionStateSampler.newForTest(clock);
     counterSet = new CounterSet();
+    logFolder = tempFolder.newFolder();
+    System.setProperty(
+        DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY,
+        new File(logFolder, "dataflow-json.log").getAbsolutePath());
+    // We need to reset *first* because some other test may have already 
initialized the
+    // logging initializer.
+    DataflowWorkerLoggingInitializer.reset();
+    DataflowWorkerLoggingInitializer.initialize();
+  }
+
+  @After
+  public void tearDown() {
+    DataflowWorkerLoggingInitializer.reset();
   }
 
   private final NameContext step1 =
@@ -67,7 +102,7 @@ public class DataflowExecutionStateTrackerTest {
   @Test
   public void testReportsElementExecutionTime() throws IOException {
     enableTimePerElementExperiment();
-    ExecutionStateTracker tracker = createTracker();
+    DataflowExecutionStateTracker tracker = createTracker();
 
     try (Closeable c1 = tracker.activate(new Thread())) {
       try (Closeable c2 = tracker.enterState(step1Process)) {}
@@ -84,7 +119,7 @@ public class DataflowExecutionStateTrackerTest {
   @Test
   public void testTakesSampleOnDeactivate() throws IOException {
     enableTimePerElementExperiment();
-    ExecutionStateTracker tracker = createTracker();
+    DataflowExecutionStateTracker tracker = createTracker();
 
     try (Closeable c1 = tracker.activate(new Thread())) {
       try (Closeable c2 = tracker.enterState(step1Process)) {
@@ -123,7 +158,7 @@ public class DataflowExecutionStateTrackerTest {
                 .build()));
   }
 
-  private ExecutionStateTracker createTracker() {
+  private DataflowExecutionStateTracker createTracker() {
     return new DataflowExecutionStateTracker(
         sampler,
         new TestDataflowExecutionState(NameContext.forStage("test-stage"), 
"other"),
@@ -131,4 +166,76 @@ public class DataflowExecutionStateTrackerTest {
         options,
         "test-work-item-id");
   }
+
+  @Test
+  public void testLullReportsRightTrace() throws Exception {
+    Thread mockThread = mock(Thread.class);
+    StackTraceElement[] doFnStackTrace =
+        new StackTraceElement[] {
+          new StackTraceElement(
+              "userpackage.SomeUserDoFn", "helperMethod", "SomeUserDoFn.java", 
250),
+          new StackTraceElement("userpackage.SomeUserDoFn", "process", 
"SomeUserDoFn.java", 450),
+          new StackTraceElement(
+              SimpleDoFnRunner.class.getName(), "processElement", 
"SimpleDoFnRunner.java", 500),
+        };
+    when(mockThread.getStackTrace()).thenReturn(doFnStackTrace);
+    FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
+    DataflowExecutionStateTracker tracker = createTracker(clock);
+    tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+    verifyLullLog();
+
+    clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
+    tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+    verifyLullLog();
+
+    clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+    tracker.reportBundleLull(mockThread, 6 * 60 * 1000);
+    verifyLullLog();
+
+    clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+    tracker.reportBundleLull(mockThread, 30 * 60 * 1000);
+    verifyLullLog();
+  }
+
+  private void verifyLullLog() throws IOException {
+    File[] files = logFolder.listFiles();
+    assertThat(files, Matchers.arrayWithSize(1));
+    File logFile = files[0];
+    List<String> lines = Files.readAllLines(logFile.toPath());
+
+    String warnLines =
+        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"WARN\"")));
+    assertThat(
+        warnLines,
+        Matchers.allOf(
+            Matchers.containsString("Operation ongoing in bundle for at 
least"),
+            Matchers.containsString(" without completing"),
+            Matchers.containsString("Processing times in each step"),
+            Matchers.containsString(
+                
"org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker"),
+            Matchers.containsString("userpackage.SomeUserDoFn.helperMethod"),
+            
Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName()))));
+
+    String infoLines =
+        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"INFO\"")));
+    assertThat(
+        infoLines,
+        Matchers.not(
+            Matchers.anyOf(
+                Matchers.containsString("Thread[backgroundThread,"),
+                Matchers.containsString(
+                    
"org.apache.beam.runners.dataflow.worker.StackTraceUtil"))));
+    // Truncate the file when done to prepare for the next test.
+    new FileOutputStream(logFile, false).getChannel().truncate(0).close();
+  }
+
+  private DataflowExecutionStateTracker createTracker(Clock clock) {
+    return new DataflowExecutionStateTracker(
+        sampler,
+        new TestDataflowExecutionState(NameContext.forStage("test-stage"), 
"other"),
+        counterSet,
+        options,
+        "test-work-item-id",
+        clock);
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
index 5c34441766c..1e6ec5f1a10 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
@@ -31,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
@@ -43,6 +44,7 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
@@ -139,19 +141,21 @@ public class MapTaskExecutorTest {
     Operation o3 = Mockito.mock(Operation.class);
     List<Operation> operations = Arrays.asList(new Operation[] {o1, o2, o3});
 
-    ExecutionStateTracker stateTracker = 
Mockito.mock(ExecutionStateTracker.class);
+    ExecutionStateTracker stateTracker = 
Mockito.spy(ExecutionStateTracker.newForTest());
 
     try (MapTaskExecutor executor = new MapTaskExecutor(operations, 
counterSet, stateTracker)) {
       executor.execute();
     }
 
     InOrder inOrder = Mockito.inOrder(stateTracker, o1, o2, o3);
+    inOrder.verify(stateTracker).activate();
     inOrder.verify(o3).start();
     inOrder.verify(o2).start();
     inOrder.verify(o1).start();
     inOrder.verify(o1).finish();
     inOrder.verify(o2).finish();
     inOrder.verify(o3).finish();
+    inOrder.verify(stateTracker).deactivate();
   }
 
   private TestOperation createOperation(String stepName, long count) {
@@ -338,6 +342,8 @@ public class MapTaskExecutorTest {
       assertThat(
           context3.metricsContainer().getUpdates().counterUpdates(),
           contains(metricUpdate("TestMetric", "MetricCounter", o3, 3L)));
+      assertEquals(0, stateTracker.getMillisSinceBundleStart());
+      assertEquals(TimeUnit.MINUTES.toMillis(10), 
stateTracker.getNextBundleLullDurationReportMs());
     }
   }
 
@@ -401,13 +407,14 @@ public class MapTaskExecutorTest {
     Operation o3 = Mockito.mock(Operation.class);
     Mockito.doThrow(new Exception("in start")).when(o2).start();
 
-    ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest();
+    ExecutionStateTracker stateTracker = 
Mockito.spy(ExecutionStateTracker.newForTest());
     try (MapTaskExecutor executor =
         new MapTaskExecutor(Arrays.<Operation>asList(o1, o2, o3), counterSet, 
stateTracker)) {
       executor.execute();
       fail("Should have thrown");
     } catch (Exception e) {
-      InOrder inOrder = Mockito.inOrder(o1, o2, o3);
+      InOrder inOrder = Mockito.inOrder(o1, o2, o3, stateTracker);
+      inOrder.verify(stateTracker).activate();
       inOrder.verify(o3).start();
       inOrder.verify(o2).start();
 
@@ -415,6 +422,7 @@ public class MapTaskExecutorTest {
       Mockito.verify(o1).abort();
       Mockito.verify(o2).abort();
       Mockito.verify(o3).abort();
+      Mockito.verify(stateTracker).deactivate();
       Mockito.verifyNoMoreInteractions(o1, o2, o3);
     }
   }
@@ -426,12 +434,13 @@ public class MapTaskExecutorTest {
     Operation o3 = Mockito.mock(Operation.class);
     Mockito.doThrow(new Exception("in finish")).when(o2).finish();
 
-    ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest();
+    ExecutionStateTracker stateTracker = 
Mockito.spy(ExecutionStateTracker.newForTest());
     try (MapTaskExecutor executor =
         new MapTaskExecutor(Arrays.<Operation>asList(o1, o2, o3), counterSet, 
stateTracker)) {
       executor.execute();
       fail("Should have thrown");
     } catch (Exception e) {
+      Mockito.verify(stateTracker).activate();
       InOrder inOrder = Mockito.inOrder(o1, o2, o3);
       inOrder.verify(o3).start();
       inOrder.verify(o2).start();
@@ -443,6 +452,7 @@ public class MapTaskExecutorTest {
       Mockito.verify(o1).abort();
       Mockito.verify(o2).abort();
       Mockito.verify(o3).abort();
+      Mockito.verify(stateTracker).deactivate();
       Mockito.verifyNoMoreInteractions(o1, o2, o3);
     }
   }
@@ -456,13 +466,14 @@ public class MapTaskExecutorTest {
     Mockito.doThrow(new Exception("in finish")).when(o2).finish();
     Mockito.doThrow(new Exception("suppressed in abort")).when(o3).abort();
 
-    ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest();
+    ExecutionStateTracker stateTracker = 
Mockito.spy(ExecutionStateTracker.newForTest());
     try (MapTaskExecutor executor =
         new MapTaskExecutor(Arrays.<Operation>asList(o1, o2, o3, o4), 
counterSet, stateTracker)) {
       executor.execute();
       fail("Should have thrown");
     } catch (Exception e) {
-      InOrder inOrder = Mockito.inOrder(o1, o2, o3, o4);
+      Mockito.verify(stateTracker).activate();
+      InOrder inOrder = Mockito.inOrder(o1, o2, o3, o4, stateTracker);
       inOrder.verify(o4).start();
       inOrder.verify(o3).start();
       inOrder.verify(o2).start();
@@ -475,6 +486,7 @@ public class MapTaskExecutorTest {
       Mockito.verify(o2).abort();
       Mockito.verify(o3).abort(); // will throw an exception, but we shouldn't 
fail
       Mockito.verify(o4).abort();
+      Mockito.verify(stateTracker).deactivate();
       Mockito.verifyNoMoreInteractions(o1, o2, o3, o4);
 
       // Make sure the failure while aborting shows up as a suppressed error
@@ -491,7 +503,7 @@ public class MapTaskExecutorTest {
     ReadOperation o1 = Mockito.mock(ReadOperation.class);
     ReadOperation o2 = Mockito.mock(ReadOperation.class);
 
-    ExecutionStateTracker stateTracker = ExecutionStateTracker.newForTest();
+    ExecutionStateTracker stateTracker = 
Mockito.spy(ExecutionStateTracker.newForTest());
     MapTaskExecutor executor =
         new MapTaskExecutor(Arrays.<Operation>asList(o1, o2), counterSet, 
stateTracker);
     Mockito.doAnswer(
@@ -502,7 +514,9 @@ public class MapTaskExecutorTest {
         .when(o1)
         .finish();
     executor.execute();
+    Mockito.verify(stateTracker).activate();
     Mockito.verify(o1, atLeastOnce()).abortReadLoop();
     Mockito.verify(o2, atLeastOnce()).abortReadLoop();
+    Mockito.verify(stateTracker).deactivate();
   }
 }


Reply via email to