This is an automated email from the ASF dual-hosted git repository. scott pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ca2a73f [BEAM-6431] Move ExecutionStateSampler and ExecutionStateTracker into runners-core-java new 52e7328 Merge pull request #7634: [BEAM-6431] Move state sampler files to runners-core so they can be used in the Java SDK as well ca2a73f is described below commit ca2a73fd99d713f96e0344fcc56d56e76168d42e Author: Alex Amato <ajam...@google.com> AuthorDate: Fri Jan 25 18:41:45 2019 -0800 [BEAM-6431] Move ExecutionStateSampler and ExecutionStateTracker into runners-core-java --- .../core/metrics}/ExecutionStateSampler.java | 9 +++++---- .../core/metrics}/ExecutionStateTracker.java | 23 ++++++++++++++++++---- .../core/metrics}/ExecutionStateSamplerTest.java | 6 +++--- .../dataflow/worker/BatchDataflowWorker.java | 2 +- .../dataflow/worker/BatchModeExecutionContext.java | 2 +- .../worker/ChunkingShuffleBatchReader.java | 4 ++-- .../dataflow/worker/ContextActivationObserver.java | 2 +- .../worker/DataflowElementExecutionTracker.java | 2 +- .../dataflow/worker/DataflowExecutionContext.java | 4 ++-- .../worker/DataflowExecutionStateRegistry.java | 2 +- .../dataflow/worker/DataflowMapTaskExecutor.java | 2 +- .../dataflow/worker/DataflowMetricsContainer.java | 2 +- .../dataflow/worker/DataflowOperationContext.java | 4 ++-- .../dataflow/worker/GroupingShuffleReader.java | 4 ++-- .../dataflow/worker/IntrinsicMapTaskExecutor.java | 2 +- ...nmentContextActivationObserverRegistration.java | 2 +- .../beam/runners/dataflow/worker/ShuffleSink.java | 4 ++-- .../dataflow/worker/StreamingDataflowWorker.java | 4 ++-- .../worker/StreamingModeExecutionContext.java | 4 ++-- .../dataflow/worker/WorkItemStatusClient.java | 4 ++-- .../WorkerCustomSourceOperationExecutor.java | 4 ++-- .../worker/fn/control/BeamFnMapTaskExecutor.java | 2 +- .../logging/DataflowWorkerLoggingHandler.java | 4 ++-- .../worker/util/common/worker/MapTaskExecutor.java | 1 + .../util/common/worker/ShuffleReadCounter.java | 1 + .../worker/BatchModeExecutionContextTest.java | 6 +++--- .../ContextActivationObserverRegistryTest.java | 2 +- .../worker/DataflowExecutionContextTest.java | 2 +- .../worker/DataflowExecutionStateTrackerTest.java | 4 ++-- .../worker/DataflowOperationContextTest.java | 10 +++++----- .../worker/DataflowSideInputReadCounterTest.java | 2 +- .../dataflow/worker/GroupingShuffleReaderTest.java | 4 ++-- .../worker/IntrinsicMapTaskExecutorTest.java | 4 ++-- .../runners/dataflow/worker/SimpleParDoFnTest.java | 2 +- .../worker/StreamingModeExecutionContextTest.java | 6 +++--- .../dataflow/worker/TestOperationContext.java | 4 ++-- .../dataflow/worker/WorkItemStatusClientTest.java | 4 ++-- .../dataflow/worker/WorkerCustomSourcesTest.java | 2 +- .../fn/control/BeamFnMapTaskExecutorTest.java | 2 +- .../logging/DataflowWorkerLoggingHandlerTest.java | 2 +- .../worker/GroupingShuffleEntryIteratorTest.java | 2 ++ .../util/common/worker/MapTaskExecutorTest.java | 2 ++ 42 files changed, 91 insertions(+), 69 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSampler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java similarity index 96% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSampler.java rename to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java index d2b437c..c3fb816 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSampler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.util.common.worker; +package org.apache.beam.runners.core.metrics; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.DateTimeUtils.MillisProvider; @@ -66,7 +67,7 @@ public class ExecutionStateSampler { private static final long PERIOD_MS = 200; - private Future<Void> executionSamplerFuture = null; + @Nullable private Future<Void> executionSamplerFuture = null; /** * Called to start the ExecutionStateSampler. Until the returned {@link Closeable} is closed, the @@ -141,7 +142,7 @@ public class ExecutionStateSampler { } /** - * Deregister tracker after MapTask completes + * Deregister tracker after MapTask completes. * * <p>This method needs to be synchronized to prevent race condition with sampling thread */ @@ -161,7 +162,7 @@ public class ExecutionStateSampler { } /** - * Attributing sampling time to trackers + * Attributing sampling time to trackers. * * <p>This method needs to be synchronized to prevent race condition with removing tracker */ diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java similarity index 94% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateTracker.java rename to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java index 57795a0..d9fc3dd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java @@ -15,10 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.util.common.worker; +package org.apache.beam.runners.core.metrics; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.Closeable; import java.util.Map; import java.util.Objects; @@ -29,6 +30,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; /** Tracks the current state of a single execution thread. */ +@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Intentional for performance.") public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> { /** @@ -102,7 +104,7 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> private final ExecutionStateSampler sampler; /** The thread being managed by this {@link ExecutionStateTracker}. */ - private Thread trackedThread = null; + @Nullable private Thread trackedThread = null; /** * The current state of the thread managed by this {@link ExecutionStateTracker}. @@ -110,7 +112,7 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> * <p>This variable is written by the Execution thread, and read by the sampling and progress * reporting threads, thus it being marked volatile. */ - private volatile ExecutionState currentState; + @Nullable private volatile ExecutionState currentState; /** * The current number of times that this {@link ExecutionStateTracker} has transitioned state. @@ -141,8 +143,18 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> } @Override + public boolean equals(Object o) { + return this == o; + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override public int compareTo(ExecutionStateTracker o) { - if (this == o) { + if (this.equals(o)) { return 0; } else { return System.identityHashCode(this) - System.identityHashCode(o); @@ -215,6 +227,9 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> * from the execution thread. */ @SuppressWarnings("NonAtomicVolatileUpdate") + @SuppressFBWarnings( + value = "VO_VOLATILE_INCREMENT", + justification = "Intentional for performance.") public Closeable enterState(ExecutionState newState) { // WARNING: This method is called in the hottest path, and must be kept as efficient as // possible. Avoid blocking, synchronizing, etc. diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSamplerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ExecutionStateSamplerTest.java similarity index 95% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSamplerTest.java rename to runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ExecutionStateSamplerTest.java index b57cc46..073cfd1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSamplerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ExecutionStateSamplerTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.util.common.worker; +package org.apache.beam.runners.core.metrics; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; @@ -23,12 +23,12 @@ import static org.mockito.Mockito.mock; import java.io.Closeable; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.joda.time.DateTimeUtils.MillisProvider; import org.junit.Before; import org.junit.Test; -/** Tests for {@link ExecutionStateSampler}. */ +/** Tests for {@link org.apache.beam.runners.core.metrics.ExecutionStateSampler}. */ public class ExecutionStateSamplerTest { private MillisProvider clock; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java index 9300dfd..792dcf9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java @@ -26,6 +26,7 @@ import java.util.function.Function; import javax.annotation.Nullable; import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness; @@ -47,7 +48,6 @@ import org.apache.beam.runners.dataflow.worker.graph.ReplacePgbkWithPrecombineFu import org.apache.beam.runners.dataflow.worker.status.DebugCapture; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; import org.apache.beam.sdk.util.Weighted; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 6d42a10..7933002 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -29,6 +29,7 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.metrics.CounterCell; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.MetricUpdates; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -36,7 +37,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java index 840b585..ceb39f0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java @@ -23,9 +23,9 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleBatchReader; import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry; import org.apache.beam.runners.dataflow.worker.util.common.worker.ShufflePosition; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserver.java index e25ed95..10a6bd4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserver.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow.worker; import com.google.auto.service.AutoService; import java.io.Closeable; import java.util.ServiceLoader; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.sdk.annotations.Experimental; /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTracker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTracker.java index 2db5ef7..70016bc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTracker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTracker.java @@ -29,12 +29,12 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 621cdb5..2ce168ce 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -31,13 +31,13 @@ import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateRegistry.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateRegistry.java index e973664..133e2ad 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateRegistry.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateRegistry.java @@ -23,9 +23,9 @@ import com.google.api.services.dataflow.model.CounterUpdate; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutor.java index c811842..96db026 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutor.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.dataflow.worker; import java.util.List; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor; import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java index bd9599b..745f9f6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java index d57c047..23e302e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java @@ -28,13 +28,13 @@ import java.util.logging.Level; import java.util.logging.LogRecord; import javax.annotation.Nullable; import org.apache.beam.runners.core.SimpleDoFnRunner; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java index e840bfb..d4a2551 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java @@ -31,13 +31,13 @@ import java.io.IOException; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment; import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows; import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutor.java index c0caa53..7aa00a1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutor.java @@ -20,8 +20,8 @@ package org.apache.beam.runners.dataflow.worker; import com.google.api.services.dataflow.model.CounterUpdate; import java.util.Collections; import java.util.List; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation; /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsEnvironmentContextActivationObserverRegistration.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsEnvironmentContextActivationObserverRegistration.java index 1e600f9..b9499af 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsEnvironmentContextActivationObserverRegistration.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsEnvironmentContextActivationObserverRegistration.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow.worker; import com.google.auto.service.AutoService; import java.io.Closeable; import java.io.IOException; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ShuffleSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ShuffleSink.java index 11f6549..d2f2e3c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ShuffleSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ShuffleSink.java @@ -20,12 +20,12 @@ package org.apache.beam.runners.dataflow.worker; import java.io.Closeable; import java.io.IOException; import java.util.Arrays; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.util.RandomAccessData; import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.CounterName; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index abe792f..fb0e8bb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -62,6 +62,8 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.internal.CustomSources; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; @@ -103,8 +105,6 @@ import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter; import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 78555ca..ea5dbe7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -38,12 +38,12 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StepContext; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java index 2d124ac..c651b73 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java @@ -37,13 +37,13 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.util.TimeUtil; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java index ad5005b..4f87291 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java @@ -23,9 +23,9 @@ import com.google.api.services.dataflow.model.SourceOperationResponse; import com.google.api.services.dataflow.model.SourceSplitRequest; import java.io.Closeable; import java.util.Collections; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.sdk.options.PipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java index cad9fca..98ec418 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java @@ -47,6 +47,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse; import org.apache.beam.runners.core.construction.metrics.MetricKey; import org.apache.beam.runners.core.metrics.DistributionData; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.GaugeData; import org.apache.beam.runners.core.metrics.MetricUpdates; import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; @@ -55,7 +56,6 @@ import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitRequest; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java index 82b329d..7428390 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java @@ -40,10 +40,10 @@ import java.util.logging.Handler; import java.util.logging.LogRecord; import java.util.logging.SimpleFormatter; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.counters.NameContext; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v20_0.com.google.common.io.CountingOutputStream; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java index c729fab..07d23ef 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.util.List; import java.util.ListIterator; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleReadCounter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleReadCounter.java index fcd0670..3ee2267 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleReadCounter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleReadCounter.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.util.common.worker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterName; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index 0e4c7dd..6d58f4d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -30,13 +30,13 @@ import com.google.api.services.dataflow.model.CounterStructuredName; import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.DistributionUpdate; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchModeExecutionState; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.MetricName; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserverRegistryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserverRegistryTest.java index 4c98f2e..64c30f4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserverRegistryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserverRegistryTest.java @@ -25,7 +25,7 @@ import com.google.auto.service.AutoService; import java.io.Closeable; import java.io.IOException; import java.util.List; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java index 8bc142e..7008dd4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java @@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue; import com.google.auto.service.AutoService; import java.io.Closeable; import java.io.IOException; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java index 598e994..f1f8aa3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock; import java.io.Closeable; import java.io.IOException; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.TestOperationContext.TestDataflowExecutionState; @@ -34,8 +36,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java index 78c365f..014d3cb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java @@ -17,10 +17,10 @@ */ package org.apache.beam.runners.dataflow.worker; -import static org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ABORT_STATE_NAME; -import static org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.FINISH_STATE_NAME; -import static org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.PROCESS_STATE_NAME; -import static org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.START_STATE_NAME; +import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.ABORT_STATE_NAME; +import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.FINISH_STATE_NAME; +import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.PROCESS_STATE_NAME; +import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.START_STATE_NAME; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertThat; @@ -36,6 +36,7 @@ import java.io.IOException; import java.nio.file.Files; import javax.annotation.Nullable; import org.apache.beam.runners.core.SimpleDoFnRunner; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchModeExecutionStateRegistry; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; @@ -43,7 +44,6 @@ import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.testing.RestoreSystemProperties; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounterTest.java index 577a576..4197648 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounterTest.java @@ -26,11 +26,11 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; import org.apache.beam.runners.dataflow.worker.counters.NameContext; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReaderTest.java index 35ea0fa..ff53cb4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReaderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReaderTest.java @@ -46,6 +46,8 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment; @@ -58,8 +60,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutorTestUtils; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java index 1cf9815..67d3670 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java @@ -42,6 +42,8 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker; @@ -53,8 +55,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutorTestUtils.TestReader; import org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java index 2f58679..ec3d9dd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java @@ -38,13 +38,13 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution; import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; import org.apache.beam.sdk.options.PipelineOptions; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 0486b92..6ebabc4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -43,6 +43,9 @@ import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionStateRegistry; @@ -50,9 +53,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.metrics.MetricsContainer; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestOperationContext.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestOperationContext.java index e64b6a0..a58f234 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestOperationContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestOperationContext.java @@ -19,13 +19,13 @@ package org.apache.beam.runners.dataflow.worker; import com.google.api.services.dataflow.model.CounterUpdate; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext; import org.apache.beam.sdk.metrics.MetricsContainer; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClientTest.java index 6353667..9fcdb14 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClientTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClientTest.java @@ -49,6 +49,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.metrics.CounterCell; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.DataflowReaderPosition; @@ -56,8 +58,6 @@ import org.apache.beam.runners.dataflow.worker.WorkerCustomSources.BoundedSource import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 0fcd92b..207b6d9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -67,6 +67,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -79,7 +80,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.sdk.Pipeline; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java index af1fce2..f38d31f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java @@ -40,11 +40,11 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation; import org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext; import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation; import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java index 51e494a..e48291b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java @@ -30,11 +30,11 @@ import java.nio.charset.StandardCharsets; import java.util.logging.Level; import java.util.logging.LogRecord; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.NameContextsForTests; import org.apache.beam.runners.dataflow.worker.TestOperationContext.TestDataflowExecutionState; import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC; -import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Timestamp; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier; import org.junit.After; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/GroupingShuffleEntryIteratorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/GroupingShuffleEntryIteratorTest.java index f4ed032..c193733 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/GroupingShuffleEntryIteratorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/GroupingShuffleEntryIteratorTest.java @@ -31,6 +31,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java index f1db84d..ad0b862 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java @@ -42,6 +42,8 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.beam.runners.core.metrics.ExecutionStateSampler; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.worker.DataflowElementExecutionTracker;