This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ef74412  Enable SideInput metrics for DF worker. These have been 
launched on Dataflow
     new ebafa0e  Merge pull request #14018 from pabloem/enablesim
ef74412 is described below

commit ef7441257d655cd8bdc5ab502acfdaced9ad6e39
Author: Pablo Estrada <pabl...@apache.org>
AuthorDate: Thu Feb 18 13:31:08 2021 -0800

    Enable SideInput metrics for DF worker. These have been launched on Dataflow
---
 .../beam/runners/dataflow/worker/ExperimentContext.java       |  3 +--
 .../beam/runners/dataflow/worker/IsmSideInputReader.java      | 11 ++---------
 .../beam/runners/dataflow/worker/IsmSideInputReaderTest.java  |  6 +-----
 3 files changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
index 0d9e0ef..d215799 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
@@ -44,8 +44,7 @@ public class ExperimentContext {
      * operations for some IO connectors.
      */
     EnableConscryptSecurityProvider("enable_conscrypt_security_provider"),
-    IntertransformIO("intertransform_io"), // Intertransform metrics for 
Shuffle IO (insights)
-    SideInputIOMetrics("sideinput_io_metrics"); // Intertransform metrics for 
Side Input IO
+    IntertransformIO("intertransform_io"); // Intertransform metrics for 
Shuffle IO (insights)
 
     private final String name;
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 26adc74..37fa6fc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -56,7 +56,6 @@ import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
-import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
 import org.apache.beam.runners.dataflow.worker.util.WorkerPropertyNames;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.sdk.coders.Coder;
@@ -218,14 +217,8 @@ public class IsmSideInputReader implements SideInputReader 
{
       throw new Exception("unexpected kind of side input: " + sideInputKind);
     }
 
-    SideInputReadCounter sideInputReadCounter;
-    ExperimentContext ec = ExperimentContext.parseFrom(options);
-    if (ec.isEnabled(Experiment.SideInputIOMetrics)) {
-      sideInputReadCounter =
-          new DataflowSideInputReadCounter(executionContext, operationContext, 
sideInputIndex);
-    } else {
-      sideInputReadCounter = new NoopSideInputReadCounter();
-    }
+    SideInputReadCounter sideInputReadCounter =
+        new DataflowSideInputReadCounter(executionContext, operationContext, 
sideInputIndex);
 
     ImmutableList.Builder<IsmReader<?>> builder = ImmutableList.builder();
     for (Source source : sideInputInfo.getSources()) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
index 4083ee1..2367eac 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
-import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
 import 
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
@@ -114,7 +113,6 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
@@ -169,9 +167,7 @@ public class IsmSideInputReaderTest {
 
   @Before
   public void setUp() {
-    pipelineOptions
-        .as(DataflowPipelineDebugOptions.class)
-        
.setExperiments(Lists.newArrayList(Experiment.SideInputIOMetrics.getName()));
+    pipelineOptions.as(DataflowPipelineDebugOptions.class);
     setupCloser = Closer.create();
     
setupCloser.register(executionContext.getExecutionStateTracker().activate());
     setupCloser.register(operationContext.enterProcess());

Reply via email to