Throw UnsupportedOperationException for committed metrics results in spark 
runner

Added metrics support for MultiDo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d7d49ce8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d7d49ce8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d7d49ce8

Branch: refs/heads/master
Commit: d7d49ce8a1bff63d4205fd641c90e36b0f88bb17
Parents: 2286578
Author: Aviem Zur <aviem...@gmail.com>
Authored: Sun Jan 29 12:54:07 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Feb 15 11:10:48 2017 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  1 -
 .../beam/runners/spark/TestSparkRunner.java     |  5 ++++
 .../runners/spark/metrics/SparkBeamMetric.java  |  4 ++--
 .../spark/metrics/SparkMetricResults.java       |  3 ++-
 .../spark/metrics/SparkMetricsContainer.java    | 20 ++++++++++++++--
 .../spark/translation/MultiDoFnFunction.java    | 25 ++++++++++++++------
 .../spark/translation/TransformTranslator.java  | 15 ++++++++----
 .../streaming/StreamingTransformTranslator.java | 14 +++++++----
 .../apache/beam/sdk/metrics/MetricsTest.java    | 25 +++++++++++++-------
 9 files changed, 80 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index c9d8e30..3ef7ef4 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -77,7 +77,6 @@
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
-                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
                     org.apache.beam.sdk.testing.UsesCommittedMetrics
                   </excludedGroups>
                   <forkCount>1</forkCount>

http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 798ca47..e770164 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
+import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
@@ -106,6 +107,10 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
     TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);
+
+    // clear metrics singleton
+    SparkMetricsContainer.clear();
+
     SparkPipelineResult result = delegate.run(pipeline);
     result.waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
index 0c656d7..8e31b22 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
@@ -41,10 +41,10 @@ class SparkBeamMetric implements Metric {
     MetricQueryResults metricQueryResults =
         metricResults.queryMetrics(MetricsFilter.builder().build());
     for (MetricResult<Long> metricResult : metricQueryResults.counters()) {
-      metrics.put(renderName(metricResult), metricResult.committed());
+      metrics.put(renderName(metricResult), metricResult.attempted());
     }
     for (MetricResult<DistributionResult> metricResult : 
metricQueryResults.distributions()) {
-      DistributionResult result = metricResult.committed();
+      DistributionResult result = metricResult.attempted();
       metrics.put(renderName(metricResult) + ".count", result.count());
       metrics.put(renderName(metricResult) + ".sum", result.sum());
       metrics.put(renderName(metricResult) + ".min", result.min());

http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
index 330b060..a9651e2 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java
@@ -169,7 +169,8 @@ public class SparkMetricResults extends MetricResults {
 
     @Override
     public T committed() {
-      return result;
+      throw new UnsupportedOperationException("Spark runner does not currently 
support committed"
+          + " metrics results. Please use 'attempted' instead.");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
index 9d5bb47..0bf0e70 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.spark.metrics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -52,8 +53,7 @@ public class SparkMetricsContainer implements Serializable {
     if (metricsContainers == null) {
       synchronized (this) {
         if (metricsContainers == null) {
-          metricsContainers = CacheBuilder.<String, 
SparkMetricsContainer>newBuilder()
-              .build(new MetricsContainerCacheLoader());
+          initializeMetricsContainers();
         }
       }
     }
@@ -128,6 +128,11 @@ public class SparkMetricsContainer implements Serializable 
{
     }
   }
 
+  private void initializeMetricsContainers() {
+    metricsContainers = CacheBuilder.<String, 
SparkMetricsContainer>newBuilder()
+        .build(new MetricsContainerCacheLoader());
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -136,4 +141,15 @@ public class SparkMetricsContainer implements Serializable 
{
     }
     return sb.toString();
   }
+
+  @VisibleForTesting
+  public static void clear() {
+    try {
+      SparkMetricsContainer instance = getInstance();
+      instance.initializeMetricsContainers();
+      instance.counters.clear();
+      instance.distributions.clear();
+    } catch (IllegalStateException ignored) {
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 911e6c5..a761954 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
+import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
 import org.apache.beam.runners.spark.util.SparkSideInputReader;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -51,7 +52,9 @@ import scala.Tuple2;
 public class MultiDoFnFunction<InputT, OutputT>
     implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, 
TupleTag<?>, WindowedValue<?>> {
 
-  private final Accumulator<NamedAggregators> accumulator;
+  private final Accumulator<NamedAggregators> aggAccum;
+  private final Accumulator<SparkMetricsContainer> metricsAccum;
+  private final String stepName;
   private final DoFn<InputT, OutputT> doFn;
   private final SparkRuntimeContext runtimeContext;
   private final TupleTag<OutputT> mainOutputTag;
@@ -59,7 +62,8 @@ public class MultiDoFnFunction<InputT, OutputT>
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   /**
-   * @param accumulator       The Spark {@link Accumulator} that backs the 
Beam Aggregators.
+   * @param aggAccum       The Spark {@link Accumulator} that backs the Beam 
Aggregators.
+   * @param metricsAccum       The Spark {@link Accumulator} that backs the 
Beam metrics.
    * @param doFn              The {@link DoFn} to be wrapped.
    * @param runtimeContext    The {@link SparkRuntimeContext}.
    * @param mainOutputTag     The main output {@link TupleTag}.
@@ -67,14 +71,17 @@ public class MultiDoFnFunction<InputT, OutputT>
    * @param windowingStrategy Input {@link WindowingStrategy}.
    */
   public MultiDoFnFunction(
-      Accumulator<NamedAggregators> accumulator,
+      Accumulator<NamedAggregators> aggAccum,
+      Accumulator<SparkMetricsContainer> metricsAccum,
+      String stepName,
       DoFn<InputT, OutputT> doFn,
       SparkRuntimeContext runtimeContext,
       TupleTag<OutputT> mainOutputTag,
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> 
sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
-
-    this.accumulator = accumulator;
+    this.aggAccum = aggAccum;
+    this.metricsAccum = metricsAccum;
+    this.stepName = stepName;
     this.doFn = doFn;
     this.runtimeContext = runtimeContext;
     this.mainOutputTag = mainOutputTag;
@@ -97,10 +104,14 @@ public class MultiDoFnFunction<InputT, OutputT>
             mainOutputTag,
             Collections.<TupleTag<?>>emptyList(),
             new SparkProcessContext.NoOpStepContext(),
-            new SparkAggregators.Factory(runtimeContext, accumulator),
+            new SparkAggregators.Factory(runtimeContext, aggAccum),
             windowingStrategy);
 
-    return new SparkProcessContext<>(doFn, doFnRunner, 
outputManager).processPartition(iter);
+    DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
+        new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum);
+
+    return new SparkProcessContext<>(doFn, doFnRunnerWithMetrics, 
outputManager)
+        .processPartition(iter);
   }
 
   private class DoFnOutputManager

http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7f4b708..584bcc3 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -269,6 +269,7 @@ public final class TransformTranslator {
     return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, 
EvaluationContext context) {
+        String stepName = context.getCurrentTransform().getFullName();
         DoFn<InputT, OutputT> doFn = transform.getFn();
         rejectStateAndTimers(doFn);
         @SuppressWarnings("unchecked")
@@ -276,13 +277,17 @@ public final class TransformTranslator {
             ((BoundedDataset<InputT>) 
context.borrowDataset(transform)).getRDD();
         WindowingStrategy<?, ?> windowingStrategy =
             context.getInput(transform).getWindowingStrategy();
-        Accumulator<NamedAggregators> accum =
-            SparkAggregators.getNamedAggregators(context.getSparkContext());
+        JavaSparkContext jsc = context.getSparkContext();
+        Accumulator<NamedAggregators> aggAccum =
+            SparkAggregators.getNamedAggregators(jsc);
+        Accumulator<SparkMetricsContainer> metricsAccum =
+            MetricsAccumulator.getOrCreateInstance(jsc);
         JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD
             .mapPartitionsToPair(
-                new MultiDoFnFunction<>(accum, doFn, 
context.getRuntimeContext(),
-                transform.getMainOutputTag(), TranslationUtils.getSideInputs(
-                    transform.getSideInputs(), context), 
windowingStrategy)).cache();
+                new MultiDoFnFunction<>(aggAccum, metricsAccum, stepName, doFn,
+                    context.getRuntimeContext(), transform.getMainOutputTag(),
+                    TranslationUtils.getSideInputs(transform.getSideInputs(), 
context),
+                    windowingStrategy)).cache();
         List<TaggedPValue> pct = context.getOutputs(transform);
         for (TaggedPValue e : pct) {
           @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 2bfd172..f270a99 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -433,14 +433,18 @@ final class StreamingTransformTranslator {
           @Override
           public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(
               JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
-            final Accumulator<NamedAggregators> accum =
-                SparkAggregators.getNamedAggregators(new 
JavaSparkContext(rdd.context()));
-
+            String stepName = context.getCurrentTransform().getFullName();
+            JavaSparkContext jsc = new JavaSparkContext(rdd.context());
+            final Accumulator<NamedAggregators> aggAccum =
+                SparkAggregators.getNamedAggregators(jsc);
+            final Accumulator<SparkMetricsContainer> metricsAccum =
+                MetricsAccumulator.getOrCreateInstance(jsc);
             final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
SideInputBroadcast<?>>> sideInputs =
                 TranslationUtils.getSideInputs(transform.getSideInputs(),
                     JavaSparkContext.fromSparkContext(rdd.context()), pviews);
-              return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, 
doFn,
-                  runtimeContext, transform.getMainOutputTag(), sideInputs, 
windowingStrategy));
+              return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(aggAccum, 
metricsAccum,
+                  stepName, doFn, runtimeContext, 
transform.getMainOutputTag(), sideInputs,
+                  windowingStrategy));
           }
         }).cache();
         List<TaggedPValue> pct = context.getOutputs(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 57a1d23..dd75e58 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -37,6 +37,8 @@ import org.apache.beam.sdk.testing.UsesCommittedMetrics;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.CoreMatchers;
 import org.junit.After;
 import org.junit.Test;
@@ -167,6 +169,8 @@ public class MetricsTest implements Serializable {
   private PipelineResult runPipelineWithMetrics() {
     final Counter count = Metrics.counter(MetricsTest.class, "count");
     Pipeline pipeline = TestPipeline.create();
+    final TupleTag<Integer> output1 = new TupleTag<Integer>(){};
+    final TupleTag<Integer> output2 = new TupleTag<Integer>(){};
     pipeline
         .apply(Create.of(5, 8, 13))
         .apply("MyStep1", ParDo.of(new DoFn<Integer, Integer>() {
@@ -193,15 +197,18 @@ public class MetricsTest implements Serializable {
             bundleDist.update(40L);
           }
         }))
-        .apply("MyStep2", ParDo.of(new DoFn<Integer, Integer>() {
-          @SuppressWarnings("unused")
-          @ProcessElement
-          public void processElement(ProcessContext c) {
-            Distribution values = Metrics.distribution(MetricsTest.class, 
"input");
-            count.inc();
-            values.update(c.element());
-          }
-        }));
+        .apply("MyStep2", ParDo.withOutputTags(output1, 
TupleTagList.of(output2))
+            .of(new DoFn<Integer, Integer>() {
+              @SuppressWarnings("unused")
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                Distribution values = Metrics.distribution(MetricsTest.class, 
"input");
+                count.inc();
+                values.update(c.element());
+                c.output(c.element());
+                c.sideOutput(output2, c.element());
+              }
+            }));
     PipelineResult result = pipeline.run();
 
     result.waitUntilFinish();

Reply via email to