This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ef74412 Enable SideInput metrics for DF worker. These have been launched on Dataflow new ebafa0e Merge pull request #14018 from pabloem/enablesim ef74412 is described below commit ef7441257d655cd8bdc5ab502acfdaced9ad6e39 Author: Pablo Estrada <pabl...@apache.org> AuthorDate: Thu Feb 18 13:31:08 2021 -0800 Enable SideInput metrics for DF worker. These have been launched on Dataflow --- .../beam/runners/dataflow/worker/ExperimentContext.java | 3 +-- .../beam/runners/dataflow/worker/IsmSideInputReader.java | 11 ++--------- .../beam/runners/dataflow/worker/IsmSideInputReaderTest.java | 6 +----- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java index 0d9e0ef..d215799 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java @@ -44,8 +44,7 @@ public class ExperimentContext { * operations for some IO connectors. */ EnableConscryptSecurityProvider("enable_conscrypt_security_provider"), - IntertransformIO("intertransform_io"), // Intertransform metrics for Shuffle IO (insights) - SideInputIOMetrics("sideinput_io_metrics"); // Intertransform metrics for Side Input IO + IntertransformIO("intertransform_io"); // Intertransform metrics for Shuffle IO (insights) private final String name; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java index 26adc74..37fa6fc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java @@ -56,7 +56,6 @@ import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.RandomAccessData; -import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment; import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.sdk.coders.Coder; @@ -218,14 +217,8 @@ public class IsmSideInputReader implements SideInputReader { throw new Exception("unexpected kind of side input: " + sideInputKind); } - SideInputReadCounter sideInputReadCounter; - ExperimentContext ec = ExperimentContext.parseFrom(options); - if (ec.isEnabled(Experiment.SideInputIOMetrics)) { - sideInputReadCounter = - new DataflowSideInputReadCounter(executionContext, operationContext, sideInputIndex); - } else { - sideInputReadCounter = new NoopSideInputReadCounter(); - } + SideInputReadCounter sideInputReadCounter = + new DataflowSideInputReadCounter(executionContext, operationContext, sideInputIndex); ImmutableList.Builder<IsmReader<?>> builder = ImmutableList.builder(); for (Source source : sideInputInfo.getSources()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java index 4083ee1..2367eac 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java @@ -70,7 +70,6 @@ import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.RandomAccessData; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; -import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterName; @@ -114,7 +113,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering; @@ -169,9 +167,7 @@ public class IsmSideInputReaderTest { @Before public void setUp() { - pipelineOptions - .as(DataflowPipelineDebugOptions.class) - .setExperiments(Lists.newArrayList(Experiment.SideInputIOMetrics.getName())); + pipelineOptions.as(DataflowPipelineDebugOptions.class); setupCloser = Closer.create(); setupCloser.register(executionContext.getExecutionStateTracker().activate()); setupCloser.register(operationContext.enterProcess());