Throw UnsupportedOperationException for committed metrics results in spark runner
Added metrics support for MultiDo Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d7d49ce8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d7d49ce8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d7d49ce8 Branch: refs/heads/master Commit: d7d49ce8a1bff63d4205fd641c90e36b0f88bb17 Parents: 2286578 Author: Aviem Zur <aviem...@gmail.com> Authored: Sun Jan 29 12:54:07 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Feb 15 11:10:48 2017 +0200 ---------------------------------------------------------------------- runners/spark/pom.xml | 1 - .../beam/runners/spark/TestSparkRunner.java | 5 ++++ .../runners/spark/metrics/SparkBeamMetric.java | 4 ++-- .../spark/metrics/SparkMetricResults.java | 3 ++- .../spark/metrics/SparkMetricsContainer.java | 20 ++++++++++++++-- .../spark/translation/MultiDoFnFunction.java | 25 ++++++++++++++------ .../spark/translation/TransformTranslator.java | 15 ++++++++---- .../streaming/StreamingTransformTranslator.java | 14 +++++++---- .../apache/beam/sdk/metrics/MetricsTest.java | 25 +++++++++++++------- 9 files changed, 80 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index c9d8e30..3ef7ef4 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -77,7 +77,6 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics </excludedGroups> <forkCount>1</forkCount> http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 798ca47..e770164 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; +import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; @@ -106,6 +107,10 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { @Override public SparkPipelineResult run(Pipeline pipeline) { TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); + + // clear metrics singleton + SparkMetricsContainer.clear(); + SparkPipelineResult result = delegate.run(pipeline); result.waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java index 0c656d7..8e31b22 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java @@ -41,10 +41,10 @@ class SparkBeamMetric implements Metric { MetricQueryResults metricQueryResults = metricResults.queryMetrics(MetricsFilter.builder().build()); for (MetricResult<Long> metricResult : metricQueryResults.counters()) { - metrics.put(renderName(metricResult), metricResult.committed()); + metrics.put(renderName(metricResult), metricResult.attempted()); } for (MetricResult<DistributionResult> metricResult : metricQueryResults.distributions()) { - DistributionResult result = metricResult.committed(); + DistributionResult result = metricResult.attempted(); metrics.put(renderName(metricResult) + ".count", result.count()); metrics.put(renderName(metricResult) + ".sum", result.sum()); metrics.put(renderName(metricResult) + ".min", result.min()); http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java index 330b060..a9651e2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java @@ -169,7 +169,8 @@ public class SparkMetricResults extends MetricResults { @Override public T committed() { - return result; + throw new UnsupportedOperationException("Spark runner does not currently support committed" + + " metrics results. Please use 'attempted' instead."); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java index 9d5bb47..0bf0e70 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark.metrics; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -52,8 +53,7 @@ public class SparkMetricsContainer implements Serializable { if (metricsContainers == null) { synchronized (this) { if (metricsContainers == null) { - metricsContainers = CacheBuilder.<String, SparkMetricsContainer>newBuilder() - .build(new MetricsContainerCacheLoader()); + initializeMetricsContainers(); } } } @@ -128,6 +128,11 @@ public class SparkMetricsContainer implements Serializable { } } + private void initializeMetricsContainers() { + metricsContainers = CacheBuilder.<String, SparkMetricsContainer>newBuilder() + .build(new MetricsContainerCacheLoader()); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -136,4 +141,15 @@ public class SparkMetricsContainer implements Serializable { } return sb.toString(); } + + @VisibleForTesting + public static void clear() { + try { + SparkMetricsContainer instance = getInstance(); + instance.initializeMetricsContainers(); + instance.counters.clear(); + instance.distributions.clear(); + } catch (IllegalStateException ignored) { + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 911e6c5..a761954 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -29,6 +29,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; +import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.transforms.DoFn; @@ -51,7 +52,9 @@ import scala.Tuple2; public class MultiDoFnFunction<InputT, OutputT> implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> { - private final Accumulator<NamedAggregators> accumulator; + private final Accumulator<NamedAggregators> aggAccum; + private final Accumulator<SparkMetricsContainer> metricsAccum; + private final String stepName; private final DoFn<InputT, OutputT> doFn; private final SparkRuntimeContext runtimeContext; private final TupleTag<OutputT> mainOutputTag; @@ -59,7 +62,8 @@ public class MultiDoFnFunction<InputT, OutputT> private final WindowingStrategy<?, ?> windowingStrategy; /** - * @param accumulator The Spark {@link Accumulator} that backs the Beam Aggregators. + * @param aggAccum The Spark {@link Accumulator} that backs the Beam Aggregators. + * @param metricsAccum The Spark {@link Accumulator} that backs the Beam metrics. * @param doFn The {@link DoFn} to be wrapped. * @param runtimeContext The {@link SparkRuntimeContext}. * @param mainOutputTag The main output {@link TupleTag}. @@ -67,14 +71,17 @@ public class MultiDoFnFunction<InputT, OutputT> * @param windowingStrategy Input {@link WindowingStrategy}. */ public MultiDoFnFunction( - Accumulator<NamedAggregators> accumulator, + Accumulator<NamedAggregators> aggAccum, + Accumulator<SparkMetricsContainer> metricsAccum, + String stepName, DoFn<InputT, OutputT> doFn, SparkRuntimeContext runtimeContext, TupleTag<OutputT> mainOutputTag, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) { - - this.accumulator = accumulator; + this.aggAccum = aggAccum; + this.metricsAccum = metricsAccum; + this.stepName = stepName; this.doFn = doFn; this.runtimeContext = runtimeContext; this.mainOutputTag = mainOutputTag; @@ -97,10 +104,14 @@ public class MultiDoFnFunction<InputT, OutputT> mainOutputTag, Collections.<TupleTag<?>>emptyList(), new SparkProcessContext.NoOpStepContext(), - new SparkAggregators.Factory(runtimeContext, accumulator), + new SparkAggregators.Factory(runtimeContext, aggAccum), windowingStrategy); - return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter); + DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics = + new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum); + + return new SparkProcessContext<>(doFn, doFnRunnerWithMetrics, outputManager) + .processPartition(iter); } private class DoFnOutputManager http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 7f4b708..584bcc3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -269,6 +269,7 @@ public final class TransformTranslator { return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() { @Override public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) { + String stepName = context.getCurrentTransform().getFullName(); DoFn<InputT, OutputT> doFn = transform.getFn(); rejectStateAndTimers(doFn); @SuppressWarnings("unchecked") @@ -276,13 +277,17 @@ public final class TransformTranslator { ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); - Accumulator<NamedAggregators> accum = - SparkAggregators.getNamedAggregators(context.getSparkContext()); + JavaSparkContext jsc = context.getSparkContext(); + Accumulator<NamedAggregators> aggAccum = + SparkAggregators.getNamedAggregators(jsc); + Accumulator<SparkMetricsContainer> metricsAccum = + MetricsAccumulator.getOrCreateInstance(jsc); JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD .mapPartitionsToPair( - new MultiDoFnFunction<>(accum, doFn, context.getRuntimeContext(), - transform.getMainOutputTag(), TranslationUtils.getSideInputs( - transform.getSideInputs(), context), windowingStrategy)).cache(); + new MultiDoFnFunction<>(aggAccum, metricsAccum, stepName, doFn, + context.getRuntimeContext(), transform.getMainOutputTag(), + TranslationUtils.getSideInputs(transform.getSideInputs(), context), + windowingStrategy)).cache(); List<TaggedPValue> pct = context.getOutputs(transform); for (TaggedPValue e : pct) { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 2bfd172..f270a99 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -433,14 +433,18 @@ final class StreamingTransformTranslator { @Override public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call( JavaRDD<WindowedValue<InputT>> rdd) throws Exception { - final Accumulator<NamedAggregators> accum = - SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context())); - + String stepName = context.getCurrentTransform().getFullName(); + JavaSparkContext jsc = new JavaSparkContext(rdd.context()); + final Accumulator<NamedAggregators> aggAccum = + SparkAggregators.getNamedAggregators(jsc); + final Accumulator<SparkMetricsContainer> metricsAccum = + MetricsAccumulator.getOrCreateInstance(jsc); final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), JavaSparkContext.fromSparkContext(rdd.context()), pviews); - return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(accum, doFn, - runtimeContext, transform.getMainOutputTag(), sideInputs, windowingStrategy)); + return rdd.mapPartitionsToPair(new MultiDoFnFunction<>(aggAccum, metricsAccum, + stepName, doFn, runtimeContext, transform.getMainOutputTag(), sideInputs, + windowingStrategy)); } }).cache(); List<TaggedPValue> pct = context.getOutputs(transform); http://git-wip-us.apache.org/repos/asf/beam/blob/d7d49ce8/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 57a1d23..dd75e58 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -37,6 +37,8 @@ import org.apache.beam.sdk.testing.UsesCommittedMetrics; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Test; @@ -167,6 +169,8 @@ public class MetricsTest implements Serializable { private PipelineResult runPipelineWithMetrics() { final Counter count = Metrics.counter(MetricsTest.class, "count"); Pipeline pipeline = TestPipeline.create(); + final TupleTag<Integer> output1 = new TupleTag<Integer>(){}; + final TupleTag<Integer> output2 = new TupleTag<Integer>(){}; pipeline .apply(Create.of(5, 8, 13)) .apply("MyStep1", ParDo.of(new DoFn<Integer, Integer>() { @@ -193,15 +197,18 @@ public class MetricsTest implements Serializable { bundleDist.update(40L); } })) - .apply("MyStep2", ParDo.of(new DoFn<Integer, Integer>() { - @SuppressWarnings("unused") - @ProcessElement - public void processElement(ProcessContext c) { - Distribution values = Metrics.distribution(MetricsTest.class, "input"); - count.inc(); - values.update(c.element()); - } - })); + .apply("MyStep2", ParDo.withOutputTags(output1, TupleTagList.of(output2)) + .of(new DoFn<Integer, Integer>() { + @SuppressWarnings("unused") + @ProcessElement + public void processElement(ProcessContext c) { + Distribution values = Metrics.distribution(MetricsTest.class, "input"); + count.inc(); + values.update(c.element()); + c.output(c.element()); + c.sideOutput(output2, c.element()); + } + })); PipelineResult result = pipeline.run(); result.waitUntilFinish();