This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch release-2.55.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.55.0 by this push:
     new 0dc330e6d53 Revert "Implementing lull reporting at bundle level 
processing (#29882)" (#30648) (#30664)
0dc330e6d53 is described below

commit 0dc330e6d53cce51b13bcb652e80859c1b3a5975
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon Mar 18 17:29:24 2024 -0400

    Revert "Implementing lull reporting at bundle level processing (#29882)" 
(#30648) (#30664)
    
    This reverts commit ffe2dba532028cdbbb5bca9c374f0a2d756ee8bf.
    
    Co-authored-by: Arvind Ram <arvindra...@gmail.com>
---
 .../core/metrics/ExecutionStateTracker.java        |  25 ----
 .../dataflow/worker/DataflowExecutionContext.java  | 117 +----------------
 .../dataflow/worker/DataflowOperationContext.java  |  80 +++++++++++-
 .../runners/dataflow/worker/StackTraceUtil.java    |  66 ----------
 .../worker/DataflowExecutionStateTrackerTest.java  | 140 +--------------------
 .../worker/DataflowOperationContextTest.java       |  80 ++++++++++--
 6 files changed, 155 insertions(+), 353 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index b0b8f0107f3..dc6fd2f8248 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -46,7 +46,6 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
       new ConcurrentHashMap<>();
 
   private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
-  private static final long BUNDLE_LULL_REPORT_MS = 
TimeUnit.MINUTES.toMillis(10);
   private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> 
SAMPLING_UPDATER =
       AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, 
"sampling");
 
@@ -140,17 +139,8 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
    */
   private volatile long millisSinceLastTransition = 0;
 
-  /**
-   * The number of milliseconds since the {@link ExecutionStateTracker} 
initial state.
-   *
-   * <p>This variable is updated by the Sampling thread, and read by the 
Progress Reporting thread,
-   * thus it being marked volatile.
-   */
-  private volatile long millisSinceBundleStart = 0;
-
   private long transitionsAtLastSample = 0;
   private long nextLullReportMs = LULL_REPORT_MS;
-  private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
 
   public ExecutionStateTracker(ExecutionStateSampler sampler) {
     this.sampler = sampler;
@@ -165,10 +155,8 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
     currentState = null;
     numTransitions = 0;
     millisSinceLastTransition = 0;
-    millisSinceBundleStart = 0;
     transitionsAtLastSample = 0;
     nextLullReportMs = LULL_REPORT_MS;
-    nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
   }
 
   @VisibleForTesting
@@ -347,19 +335,6 @@ public class ExecutionStateTracker implements 
Comparable<ExecutionStateTracker>
       transitionsAtLastSample = transitionsAtThisSample;
     }
     updateMillisSinceLastTransition(millisSinceLastSample, state);
-    updateMillisSinceBundleStart(millisSinceLastSample);
-  }
-
-  // Override this to implement bundle level lull reporting.
-  protected void reportBundleLull(long millisSinceBundleStart) {}
-
-  @SuppressWarnings("NonAtomicVolatileUpdate")
-  private void updateMillisSinceBundleStart(long millisSinceLastSample) {
-    millisSinceBundleStart += millisSinceLastSample;
-    if (millisSinceBundleStart > nextBundleLullReportMs) {
-      reportBundleLull(millisSinceBundleStart);
-      nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
-    }
   }
 
   @SuppressWarnings("NonAtomicVolatileUpdate")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 16ff2975b02..080fa7c9dac 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.SideInputInfo;
 import java.io.Closeable;
 import java.io.IOException;
@@ -30,8 +29,6 @@ import java.util.IntSummaryStatistics;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
 import java.util.stream.Collectors;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.NullSideInputReader;
@@ -40,13 +37,10 @@ import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
-import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
-import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -54,16 +48,11 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Duration;
+import org.joda.time.DateTimeUtils.MillisProvider;
 import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormatter;
-import org.joda.time.format.PeriodFormatterBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** Execution context for the Dataflow worker. */
 @SuppressWarnings({
@@ -271,59 +260,23 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
     @Nullable
     private ActiveMessageMetadata activeMessageMetadata = null;
 
-    /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
-    private final Clock clock;
+    private final MillisProvider clock = System::currentTimeMillis;
 
     @GuardedBy("this")
     private final Map<String, IntSummaryStatistics> processingTimesByStep = 
new HashMap<>();
 
-    /** Last milliseconds since epoch when a full thread dump was performed. */
-    private long lastFullThreadDumpMillis = 0;
-
-    /** The minimum lull duration in milliseconds to perform a full thread 
dump. */
-    private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 
60 * 1000;
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DataflowExecutionStateTracker.class);
-
-    private static final PeriodFormatter DURATION_FORMATTER =
-        new PeriodFormatterBuilder()
-            .appendDays()
-            .appendSuffix("d")
-            .minimumPrintedDigits(2)
-            .appendHours()
-            .appendSuffix("h")
-            .printZeroAlways()
-            .appendMinutes()
-            .appendSuffix("m")
-            .appendSeconds()
-            .appendSuffix("s")
-            .toFormatter();
-
     public DataflowExecutionStateTracker(
         ExecutionStateSampler sampler,
         DataflowOperationContext.DataflowExecutionState otherState,
         CounterFactory counterFactory,
         PipelineOptions options,
         String workItemId) {
-      this(sampler, otherState, counterFactory, options, workItemId, 
Clock.SYSTEM);
-    }
-
-    @VisibleForTesting
-    public DataflowExecutionStateTracker(
-        ExecutionStateSampler sampler,
-        DataflowOperationContext.DataflowExecutionState otherState,
-        CounterFactory counterFactory,
-        PipelineOptions options,
-        String workItemId,
-        Clock clock) {
       super(sampler);
       this.elementExecutionTracker =
           DataflowElementExecutionTracker.create(counterFactory, options);
       this.otherState = otherState;
       this.workItemId = workItemId;
       this.contextActivationObserverRegistry = 
ContextActivationObserverRegistry.createDefault();
-      this.clock = clock;
-      DataflowWorkerLoggingInitializer.initialize();
     }
 
     @Override
@@ -348,76 +301,12 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
       }
     }
 
-    private boolean shouldLogFullThreadDumpForBundle(Duration lullDuration) {
-      if (lullDuration.getMillis() < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS) 
{
-        return false;
-      }
-      long now = clock.currentTimeMillis();
-      if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS 
< now) {
-        lastFullThreadDumpMillis = now;
-        return true;
-      }
-      return false;
-    }
-
-    private String getBundleLullMessage(Duration lullDuration) {
-      StringBuilder message = new StringBuilder();
-      message
-          .append("Operation ongoing in bundle for at least ")
-          .append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
-          .append(" without completing")
-          .append("\n");
-      synchronized (this) {
-        if (this.activeMessageMetadata != null) {
-          message.append(
-              "Current user step name: " + 
getActiveMessageMetadata().get().userStepName() + "\n");
-          message.append(
-              "Time spent in this step(millis): "
-                  + (clock.currentTimeMillis() - 
getActiveMessageMetadata().get().startTime())
-                  + "\n");
-        }
-        message.append("Processing times in each step(millis)\n");
-        for (Map.Entry<String, IntSummaryStatistics> entry :
-            this.processingTimesByStep.entrySet()) {
-          message.append("Step name: " + entry.getKey() + "\n");
-          message.append("Time spent in this step: " + 
entry.getValue().toString() + "\n");
-        }
-      }
-
-      return message.toString();
-    }
-
     @Override
     protected void takeSampleOnce(long millisSinceLastSample) {
       elementExecutionTracker.takeSample(millisSinceLastSample);
       super.takeSampleOnce(millisSinceLastSample);
     }
 
-    @Override
-    protected void reportBundleLull(long millisElapsedSinceBundleStart) {
-      // If we're not logging warnings, nothing to report.
-      if (!LOG.isWarnEnabled()) {
-        return;
-      }
-
-      Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);
-
-      // Since the lull reporting executes in the sampler thread, it won't 
automatically inherit the
-      // context of the current step. To ensure things are logged correctly, 
we get the currently
-      // registered DataflowWorkerLoggingHandler and log directly in the 
desired context.
-      LogRecord logRecord = new LogRecord(Level.WARNING, 
getBundleLullMessage(lullDuration));
-      logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());
-
-      // Publish directly in the context of this specific ExecutionState.
-      DataflowWorkerLoggingHandler dataflowLoggingHandler =
-          DataflowWorkerLoggingInitializer.getLoggingHandler();
-      dataflowLoggingHandler.publish(logRecord);
-
-      if (shouldLogFullThreadDumpForBundle(lullDuration)) {
-        StackTraceUtil.logAllStackTraces();
-      }
-    }
-
     /**
      * Enter a new state on the tracker. If the new state is a Dataflow 
processing state, tracks the
      * activeMessageMetadata with the start time of the new state.
@@ -434,7 +323,7 @@ public abstract class DataflowExecutionContext<T extends 
DataflowStepContext> {
           synchronized (this) {
             this.activeMessageMetadata =
                 ActiveMessageMetadata.create(
-                    newDFState.getStepName().userName(), 
clock.currentTimeMillis());
+                    newDFState.getStepName().userName(), clock.getMillis());
           }
         }
         elementExecutionTracker.enter(newDFState.getStepName());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
index 7d90a512519..b2ab928bc99 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
@@ -19,13 +19,16 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static 
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;
 
+import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.CounterMetadata;
 import com.google.api.services.dataflow.model.CounterStructuredName;
 import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import java.io.Closeable;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import 
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
@@ -39,6 +42,7 @@ import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationConte
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.format.PeriodFormatter;
@@ -181,6 +185,9 @@ public class DataflowOperationContext implements 
OperationContext {
     private final ProfileScope profileScope;
     private final @Nullable MetricsContainer metricsContainer;
 
+    /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
+    private final Clock clock;
+
     public DataflowExecutionState(
         NameContext nameContext,
         String stateName,
@@ -188,12 +195,31 @@ public class DataflowOperationContext implements 
OperationContext {
         @Nullable Integer inputIndex,
         @Nullable MetricsContainer metricsContainer,
         ProfileScope profileScope) {
+      this(
+          nameContext,
+          stateName,
+          requestingStepName,
+          inputIndex,
+          metricsContainer,
+          profileScope,
+          Clock.SYSTEM);
+    }
+
+    public DataflowExecutionState(
+        NameContext nameContext,
+        String stateName,
+        @Nullable String requestingStepName,
+        @Nullable Integer inputIndex,
+        @Nullable MetricsContainer metricsContainer,
+        ProfileScope profileScope,
+        Clock clock) {
       super(stateName);
       this.stepName = nameContext;
       this.requestingStepName = requestingStepName;
       this.inputIndex = inputIndex;
       this.profileScope = Preconditions.checkNotNull(profileScope);
       this.metricsContainer = metricsContainer;
+      this.clock = clock;
     }
 
     /**
@@ -225,6 +251,9 @@ public class DataflowOperationContext implements 
OperationContext {
       return description.toString();
     }
 
+    private static final ImmutableSet<String> FRAMEWORK_CLASSES =
+        ImmutableSet.of(SimpleDoFnRunner.class.getName(), 
DoFnInstanceManagers.class.getName());
+
     protected String getLullMessage(Thread trackedThread, Duration 
lullDuration) {
       StringBuilder message = new StringBuilder();
       message.append("Operation ongoing");
@@ -243,7 +272,7 @@ public class DataflowOperationContext implements 
OperationContext {
 
       message.append("\n");
 
-      
message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
+      
message.append(getStackTraceForLullMessage(trackedThread.getStackTrace()));
       return message.toString();
     }
 
@@ -267,6 +296,55 @@ public class DataflowOperationContext implements 
OperationContext {
       DataflowWorkerLoggingHandler dataflowLoggingHandler =
           DataflowWorkerLoggingInitializer.getLoggingHandler();
       dataflowLoggingHandler.publish(this, logRecord);
+
+      if (shouldLogFullThreadDump(lullDuration)) {
+        Map<Thread, StackTraceElement[]> threadSet = 
Thread.getAllStackTraces();
+        for (Map.Entry<Thread, StackTraceElement[]> entry : 
threadSet.entrySet()) {
+          Thread thread = entry.getKey();
+          StackTraceElement[] stackTrace = entry.getValue();
+          StringBuilder message = new StringBuilder();
+          message.append(thread.toString()).append(":\n");
+          message.append(getStackTraceForLullMessage(stackTrace));
+          logRecord = new LogRecord(Level.INFO, message.toString());
+          logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
+          dataflowLoggingHandler.publish(this, logRecord);
+        }
+      }
+    }
+
+    /**
+     * The time interval between two full thread dump. (A full thread dump is 
performed at most once
+     * every 20 minutes.)
+     */
+    private static final long LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS = 20 * 60 
* 1000;
+
+    /** The minimum lull duration to perform a full thread dump. */
+    private static final long LOG_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 
1000;
+
+    /** Last time when a full thread dump was performed. */
+    private long lastFullThreadDumpMillis = 0;
+
+    private boolean shouldLogFullThreadDump(Duration lullDuration) {
+      if (lullDuration.getMillis() < LOG_LULL_FULL_THREAD_DUMP_LULL_MS) {
+        return false;
+      }
+      long now = clock.currentTimeMillis();
+      if (lastFullThreadDumpMillis + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS < 
now) {
+        lastFullThreadDumpMillis = now;
+        return true;
+      }
+      return false;
+    }
+
+    private String getStackTraceForLullMessage(StackTraceElement[] stackTrace) 
{
+      StringBuilder message = new StringBuilder();
+      for (StackTraceElement e : stackTrace) {
+        if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
+          break;
+        }
+        message.append("  at ").append(e).append("\n");
+      }
+      return message.toString();
     }
 
     public @Nullable MetricsContainer getMetricsContainer() {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
deleted file mode 100644
index 041944f09cf..00000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StackTraceUtil.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
-import org.apache.beam.runners.core.SimpleDoFnRunner;
-import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
-import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
-import org.apache.beam.sdk.annotations.Internal;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Utility methods to print the stack traces of all the threads. */
-@Internal
-public final class StackTraceUtil {
-  private static final ImmutableSet<String> FRAMEWORK_CLASSES =
-      ImmutableSet.of(SimpleDoFnRunner.class.getName(), 
DoFnInstanceManagers.class.getName());
-  private static final Logger LOG = 
LoggerFactory.getLogger(StackTraceUtil.class);
-
-  public static String getStackTraceForLullMessage(StackTraceElement[] 
stackTrace) {
-    StringBuilder message = new StringBuilder();
-    for (StackTraceElement e : stackTrace) {
-      if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
-        break;
-      }
-      message.append("  at ").append(e).append("\n");
-    }
-    return message.toString();
-  }
-
-  public static void logAllStackTraces() {
-    DataflowWorkerLoggingHandler dataflowLoggingHandler =
-        DataflowWorkerLoggingInitializer.getLoggingHandler();
-    Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces();
-    for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) {
-      Thread thread = entry.getKey();
-      StackTraceElement[] stackTrace = entry.getValue();
-      StringBuilder message = new StringBuilder();
-      message.append(thread.toString()).append(":\n");
-      message.append(getStackTraceForLullMessage(stackTrace));
-      LogRecord logRecord = new LogRecord(Level.INFO, message.toString());
-      logRecord.setLoggerName(StackTraceUtil.LOG.getName());
-      dataflowLoggingHandler.publish(logRecord);
-    }
-  }
-
-  private StackTraceUtil() {}
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
index 3502b621147..551937c3559 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
@@ -22,14 +22,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
 
-import com.google.api.client.testing.http.FixedClock;
-import com.google.api.client.util.Clock;
 import java.io.Closeable;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.util.List;
 import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
@@ -41,55 +35,28 @@ import 
org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDi
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.RestoreSystemProperties;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
-import org.hamcrest.Matchers;
 import org.joda.time.DateTimeUtils.MillisProvider;
-import org.joda.time.Duration;
-import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 /** Tests for {@link DataflowExecutionStateTrackerTest}. */
 public class DataflowExecutionStateTrackerTest {
 
-  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-  @Rule public RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
-
-  private File logFolder;
   private PipelineOptions options;
   private MillisProvider clock;
   private ExecutionStateSampler sampler;
   private CounterSet counterSet;
 
   @Before
-  public void setUp() throws IOException {
+  public void setUp() {
     options = PipelineOptionsFactory.create();
     clock = mock(MillisProvider.class);
     sampler = ExecutionStateSampler.newForTest(clock);
     counterSet = new CounterSet();
-    logFolder = tempFolder.newFolder();
-    System.setProperty(
-        DataflowWorkerLoggingInitializer.RUNNER_FILEPATH_PROPERTY,
-        new File(logFolder, "dataflow-json.log").getAbsolutePath());
-    // We need to reset *first* because some other test may have already 
initialized the
-    // logging initializer.
-    DataflowWorkerLoggingInitializer.reset();
-    DataflowWorkerLoggingInitializer.initialize();
-  }
-
-  @After
-  public void tearDown() {
-    DataflowWorkerLoggingInitializer.reset();
   }
 
   private final NameContext step1 =
@@ -100,7 +67,7 @@ public class DataflowExecutionStateTrackerTest {
   @Test
   public void testReportsElementExecutionTime() throws IOException {
     enableTimePerElementExperiment();
-    DataflowExecutionStateTracker tracker = createTracker();
+    ExecutionStateTracker tracker = createTracker();
 
     try (Closeable c1 = tracker.activate(new Thread())) {
       try (Closeable c2 = tracker.enterState(step1Process)) {}
@@ -117,7 +84,7 @@ public class DataflowExecutionStateTrackerTest {
   @Test
   public void testTakesSampleOnDeactivate() throws IOException {
     enableTimePerElementExperiment();
-    DataflowExecutionStateTracker tracker = createTracker();
+    ExecutionStateTracker tracker = createTracker();
 
     try (Closeable c1 = tracker.activate(new Thread())) {
       try (Closeable c2 = tracker.enterState(step1Process)) {
@@ -156,7 +123,7 @@ public class DataflowExecutionStateTrackerTest {
                 .build()));
   }
 
-  private DataflowExecutionStateTracker createTracker() {
+  private ExecutionStateTracker createTracker() {
     return new DataflowExecutionStateTracker(
         sampler,
         new TestDataflowExecutionState(NameContext.forStage("test-stage"), 
"other"),
@@ -164,103 +131,4 @@ public class DataflowExecutionStateTrackerTest {
         options,
         "test-work-item-id");
   }
-
-  @Test
-  public void testLullReportsRightTrace() throws Exception {
-    FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
-    DataflowExecutionStateTracker tracker = createTracker(clock);
-    // Adding test for the full thread dump, but since we can't mock
-    // Thread.getAllStackTraces(), we are starting a background thread
-    // to verify the full thread dump.
-    Thread backgroundThread =
-        new Thread("backgroundThread") {
-          @Override
-          public void run() {
-            try {
-              Thread.sleep(Long.MAX_VALUE);
-            } catch (InterruptedException e) {
-              // exiting the thread
-            }
-          }
-        };
-
-    backgroundThread.start();
-    try {
-      // Full thread dump should be performed, because we never performed
-      // a full thread dump before, and the lull duration is more than 20
-      // minutes.
-      tracker.reportBundleLull(30 * 60 * 1000);
-      verifyLullLog(true);
-
-      // Full thread dump should not be performed because the last dump
-      // was only 5 minutes ago.
-      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
-      tracker.reportBundleLull(30 * 60 * 1000);
-      verifyLullLog(false);
-
-      // Full thread dump should not be performed because the lull duration
-      // is only 6 minutes.
-      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
-      tracker.reportBundleLull(6 * 60 * 1000);
-      verifyLullLog(false);
-
-      // Full thread dump should be performed, because it has been 21 minutes
-      // since the last dump, and the lull duration is more than 20 minutes.
-      clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
-      tracker.reportBundleLull(30 * 60 * 1000);
-      // verifyLullLog(true);
-    } finally {
-      // Cleaning up the background thread.
-      backgroundThread.interrupt();
-      backgroundThread.join();
-    }
-  }
-
-  private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
-    File[] files = logFolder.listFiles();
-    assertThat(files, Matchers.arrayWithSize(1));
-    File logFile = files[0];
-    List<String> lines = Files.readAllLines(logFile.toPath());
-
-    String warnLines =
-        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"WARN\"")));
-    assertThat(
-        warnLines,
-        Matchers.allOf(
-            Matchers.containsString("Operation ongoing in bundle for at 
least"),
-            Matchers.containsString(" without completing"),
-            Matchers.containsString("Processing times in each step"),
-            Matchers.containsString(
-                
"org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker")));
-
-    String infoLines =
-        Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"INFO\"")));
-    if (hasFullThreadDump) {
-      assertThat(
-          infoLines,
-          Matchers.allOf(
-              Matchers.containsString("Thread[backgroundThread,"),
-              
Matchers.containsString("org.apache.beam.runners.dataflow.worker.StackTraceUtil")));
-    } else {
-      assertThat(
-          infoLines,
-          Matchers.not(
-              Matchers.anyOf(
-                  Matchers.containsString("Thread[backgroundThread,"),
-                  Matchers.containsString(
-                      
"org.apache.beam.runners.dataflow.worker.StackTraceUtil"))));
-    }
-    // Truncate the file when done to prepare for the next test.
-    new FileOutputStream(logFile, false).getChannel().truncate(0).close();
-  }
-
-  private DataflowExecutionStateTracker createTracker(Clock clock) {
-    return new DataflowExecutionStateTracker(
-        sampler,
-        new TestDataflowExecutionState(NameContext.forStage("test-stage"), 
"other"),
-        counterSet,
-        options,
-        "test-work-item-id",
-        clock);
-  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
index 88a9f8e11c7..34c3b3d5373 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
@@ -29,6 +29,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.api.client.testing.http.FixedClock;
+import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import java.io.Closeable;
 import java.io.File;
@@ -204,6 +206,7 @@ public class DataflowOperationContextTest {
     @Test
     public void testLullReportsRightTrace() throws Exception {
       Thread mockThread = mock(Thread.class);
+      FixedClock clock = new FixedClock(Clock.SYSTEM.currentTimeMillis());
 
       DataflowExecutionState executionState =
           new DataflowExecutionState(
@@ -212,7 +215,8 @@ public class DataflowOperationContextTest {
               null /* requestingStepName */,
               null /* inputIndex */,
               null /* metricsContainer */,
-              ScopedProfiler.INSTANCE.emptyScope()) {
+              ScopedProfiler.INSTANCE.emptyScope(),
+              clock) {
             @Override
             public @Nullable CounterUpdate extractUpdate(boolean 
isFinalUpdate) {
               // not being used for extracting updates
@@ -234,11 +238,55 @@ public class DataflowOperationContextTest {
                 SimpleDoFnRunner.class.getName(), "processElement", 
"SimpleDoFnRunner.java", 500),
           };
       when(mockThread.getStackTrace()).thenReturn(doFnStackTrace);
-      executionState.reportLull(mockThread, 30 * 60 * 1000);
-      verifyLullLog();
+
+      // Adding test for the full thread dump, but since we can't mock
+      // Thread.getAllStackTraces(), we are starting a background thread
+      // to verify the full thread dump.
+      Thread backgroundThread =
+          new Thread("backgroundThread") {
+            @Override
+            public void run() {
+              try {
+                Thread.sleep(Long.MAX_VALUE);
+              } catch (InterruptedException e) {
+                // exiting the thread
+              }
+            }
+          };
+
+      backgroundThread.start();
+      try {
+        // Full thread dump should be performed, because we never performed
+        // a full thread dump before, and the lull duration is more than 20
+        // minutes.
+        executionState.reportLull(mockThread, 30 * 60 * 1000);
+        verifyLullLog(true);
+
+        // Full thread dump should not be performed because the last dump
+        // was only 5 minutes ago.
+        clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
+        executionState.reportLull(mockThread, 30 * 60 * 1000);
+        verifyLullLog(false);
+
+        // Full thread dump should not be performed because the lull duration
+        // is only 6 minutes.
+        clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+        executionState.reportLull(mockThread, 6 * 60 * 1000);
+        verifyLullLog(false);
+
+        // Full thread dump should be performed, because it has been 21 minutes
+        // since the last dump, and the lull duration is more than 20 minutes.
+        clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(16L).getMillis());
+        executionState.reportLull(mockThread, 30 * 60 * 1000);
+        verifyLullLog(true);
+      } finally {
+        // Cleaning up the background thread.
+        backgroundThread.interrupt();
+        backgroundThread.join();
+      }
     }
 
-    private void verifyLullLog() throws IOException {
+    private void verifyLullLog(boolean hasFullThreadDump) throws IOException {
       File[] files = logFolder.listFiles();
       assertThat(files, Matchers.arrayWithSize(1));
       File logFile = files[0];
@@ -257,13 +305,23 @@ public class DataflowOperationContextTest {
 
       String infoLines =
           Joiner.on("\n").join(Iterables.filter(lines, line -> 
line.contains("\"INFO\"")));
-      assertThat(
-          infoLines,
-          Matchers.not(
-              Matchers.anyOf(
-                  Matchers.containsString("Thread[backgroundThread,"),
-                  Matchers.containsString(
-                      
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"))));
+      if (hasFullThreadDump) {
+        assertThat(
+            infoLines,
+            Matchers.allOf(
+                Matchers.containsString("Thread[backgroundThread,"),
+                Matchers.containsString(
+                    
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"),
+                
Matchers.not(Matchers.containsString(SimpleDoFnRunner.class.getName()))));
+      } else {
+        assertThat(
+            infoLines,
+            Matchers.not(
+                Matchers.anyOf(
+                    Matchers.containsString("Thread[backgroundThread,"),
+                    Matchers.containsString(
+                        
"org.apache.beam.runners.dataflow.worker.DataflowOperationContext"))));
+      }
       // Truncate the file when done to prepare for the next test.
       new FileOutputStream(logFile, false).getChannel().truncate(0).close();
     }


Reply via email to