[BEAM-797] A PipelineVisitor that creates a Spark-native pipeline. [BEAM-797] Remove unnecessary temp dir from test
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94bef14e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94bef14e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94bef14e Branch: refs/heads/gearpump-runner Commit: 94bef14e50c66bd1c87bf33937f76d16ba798ee0 Parents: 2c2424c Author: Aviem Zur <aviem...@gmail.com> Authored: Sat Mar 4 23:16:24 2017 +0200 Committer: Amit Sela <amitsel...@gmail.com> Committed: Fri Mar 10 15:14:25 2017 +0200 ---------------------------------------------------------------------- runners/spark/pom.xml | 5 + .../spark/SparkNativePipelineVisitor.java | 202 +++++++++++++++++++ .../apache/beam/runners/spark/SparkRunner.java | 16 +- .../beam/runners/spark/SparkRunnerDebugger.java | 121 +++++++++++ .../spark/translation/TransformEvaluator.java | 1 + .../spark/translation/TransformTranslator.java | 105 ++++++++++ .../streaming/StreamingTransformTranslator.java | 53 ++++- .../runners/spark/SparkRunnerDebuggerTest.java | 180 +++++++++++++++++ 8 files changed, 673 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index ebd987d..a330820 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -196,6 +196,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.6</version> + </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java new file mode 100644 index 0000000..056da97 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; +import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; +import org.apache.beam.runners.spark.translation.TransformEvaluator; +import org.apache.beam.runners.spark.translation.streaming.Checkpoint; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.commons.lang.WordUtils; + + +/** + * Pipeline visitor for translating a Beam pipeline into equivalent Spark operations. + * Used for debugging purposes using {@link SparkRunnerDebugger}. + */ +public class SparkNativePipelineVisitor extends SparkRunner.Evaluator { + private final List<NativeTransform> transforms; + private final List<String> knownCompositesPackages = + Lists.newArrayList( + "org.apache.beam.sdk.transforms", + "org.apache.beam.runners.spark.examples"); + + SparkNativePipelineVisitor(SparkPipelineTranslator translator, EvaluationContext ctxt) { + super(translator, ctxt); + this.transforms = new ArrayList<>(); + MetricsAccumulator.init(ctxt.getSparkContext(), Optional.<Checkpoint.CheckpointDir>absent()); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + CompositeBehavior compositeBehavior = super.enterCompositeTransform(node); + PTransform<?, ?> transform = node.getTransform(); + if (transform != null) { + @SuppressWarnings("unchecked") + final Class<PTransform<?, ?>> transformClass = (Class<PTransform<?, ?>>) transform.getClass(); + if (compositeBehavior == CompositeBehavior.ENTER_TRANSFORM + && !knownComposite(transformClass) + && shouldDebug(node)) { + transforms.add(new NativeTransform(node, null, transform, true)); + } + } + return compositeBehavior; + } + + private boolean knownComposite(Class<PTransform<?, ?>> transform) { + String transformPackage = transform.getPackage().getName(); + for (String knownCompositePackage : knownCompositesPackages) { + if (transformPackage.startsWith(knownCompositePackage)) { + return true; + } + } + return false; + } + + private boolean shouldDebug(final TransformHierarchy.Node node) { + return node == null || !Iterables.any(transforms, new Predicate<NativeTransform>() { + @Override + public boolean apply(NativeTransform debugTransform) { + return debugTransform.getNode().equals(node) && debugTransform.isComposite(); + } + }) && shouldDebug(node.getEnclosingNode()); + } + + @Override + <TransformT extends PTransform<? super PInput, POutput>> void + doVisitTransform(TransformHierarchy.Node node) { + super.doVisitTransform(node); + @SuppressWarnings("unchecked") + TransformT transform = (TransformT) node.getTransform(); + @SuppressWarnings("unchecked") + Class<TransformT> transformClass = (Class<TransformT>) transform.getClass(); + @SuppressWarnings("unchecked") + TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass); + if (shouldDebug(node)) { + transforms.add(new NativeTransform(node, evaluator, transform, false)); + } + } + + String getDebugString() { + return Joiner.on("\n").join(transforms); + } + + private static class NativeTransform { + private final TransformHierarchy.Node node; + private final TransformEvaluator<?> transformEvaluator; + private final PTransform<?, ?> transform; + private final boolean composite; + + NativeTransform( + TransformHierarchy.Node node, + TransformEvaluator<?> transformEvaluator, + PTransform<?, ?> transform, + boolean composite) { + this.node = node; + this.transformEvaluator = transformEvaluator; + this.transform = transform; + this.composite = composite; + } + + TransformHierarchy.Node getNode() { + return node; + } + + boolean isComposite() { + return composite; + } + + @Override + public String toString() { + try { + Class<? extends PTransform> transformClass = transform.getClass(); + if (node.getFullName().equals("KafkaIO.Read")) { + return "KafkaUtils.createDirectStream(...)"; + } + if (composite) { + return "_.<" + transformClass.getName() + ">"; + } + String transformString = transformEvaluator.toNativeString(); + if (transformString.contains("<fn>")) { + transformString = replaceFnString(transformClass, transformString, "fn"); + } else if (transformString.contains("<windowFn>")) { + transformString = replaceFnString(transformClass, transformString, "windowFn"); + } else if (transformString.contains("<source>")) { + String sourceName = "..."; + if (transform instanceof Read.Bounded) { + sourceName = ((Read.Bounded<?>) transform).getSource().getClass().getName(); + } else if (transform instanceof Read.Unbounded) { + sourceName = ((Read.Unbounded<?>) transform).getSource().getClass().getName(); + } + transformString = transformString.replace("<source>", sourceName); + } + if (transformString.startsWith("sparkContext") + || transformString.startsWith("streamingContext")) { + return transformString; + } + return "_." + transformString; + } catch ( + NoSuchMethodException + | InvocationTargetException + | IllegalAccessException + | NoSuchFieldException e) { + return "<FailedTranslation>"; + } + } + + private String replaceFnString( + Class<? extends PTransform> transformClass, + String transformString, + String fnFieldName) + throws IllegalAccessException, InvocationTargetException, NoSuchMethodException, + NoSuchFieldException { + Object fn = + transformClass.getMethod("get" + WordUtils.capitalize(fnFieldName)).invoke(transform); + Class<?> fnClass = fn.getClass(); + String doFnName; + Class<?> enclosingClass = fnClass.getEnclosingClass(); + if (enclosingClass != null && enclosingClass.equals(MapElements.class)) { + Field parent = fnClass.getDeclaredField("this$0"); + parent.setAccessible(true); + Field fnField = enclosingClass.getDeclaredField(fnFieldName); + fnField.setAccessible(true); + doFnName = fnField.get(parent.get(fn)).getClass().getName(); + } else { + doFnName = fnClass.getName(); + } + transformString = transformString.replace("<" + fnFieldName + ">", doFnName); + return transformString; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 3f002da..a706f00 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -297,11 +297,12 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { /** * Evaluator on the pipeline. */ + @SuppressWarnings("WeakerAccess") public static class Evaluator extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class); - private final EvaluationContext ctxt; - private final SparkPipelineTranslator translator; + protected final EvaluationContext ctxt; + protected final SparkPipelineTranslator translator; public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) { this.translator = translator; @@ -324,7 +325,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { return CompositeBehavior.ENTER_TRANSFORM; } - private boolean shouldDefer(TransformHierarchy.Node node) { + protected boolean shouldDefer(TransformHierarchy.Node node) { // if the input is not a PCollection, or it is but with non merging windows, don't defer. if (node.getInputs().size() != 1) { return false; @@ -361,7 +362,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } <TransformT extends PTransform<? super PInput, POutput>> void - doVisitTransform(TransformHierarchy.Node node) { + doVisitTransform(TransformHierarchy.Node node) { @SuppressWarnings("unchecked") TransformT transform = (TransformT) node.getTransform(); @SuppressWarnings("unchecked") @@ -379,8 +380,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { * Determine if this Node belongs to a Bounded branch of the pipeline, or Unbounded, and * translate with the proper translator. */ - private <TransformT extends PTransform<? super PInput, POutput>> - TransformEvaluator<TransformT> translate( + protected <TransformT extends PTransform<? super PInput, POutput>> + TransformEvaluator<TransformT> translate( TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) { //--- determine if node is bounded/unbounded. // usually, the input determines if the PCollection to apply the next transformation to @@ -400,7 +401,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { : translator.translateUnbounded(transformClass); } - private PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> pValues) { + protected PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> pValues) { // anything that is not a PCollection, is BOUNDED. // For PCollections: // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED @@ -417,4 +418,3 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } } } - http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java new file mode 100644 index 0000000..395acff --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; +import org.apache.beam.runners.spark.translation.TransformTranslator; +import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Pipeline runner which translates a Beam pipeline into equivalent Spark operations, without + * running them. Used for debugging purposes. + * + * <p>Example:</p> + * + * <pre>{@code + * SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + * options.setRunner(SparkRunnerDebugger.class); + * Pipeline pipeline = Pipeline.create(options); + * SparkRunnerDebugger.DebugSparkPipelineResult result = + * (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run(); + * String sparkPipeline = result.getDebugString(); + * }</pre> + */ +public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResult> { + + private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerDebugger.class); + + private SparkRunnerDebugger() {} + + @SuppressWarnings("unused") + public static SparkRunnerDebugger fromOptions(PipelineOptions options) { + return new SparkRunnerDebugger(); + } + + @Override + public SparkPipelineResult run(Pipeline pipeline) { + SparkPipelineResult result; + + SparkPipelineOptions options = (SparkPipelineOptions) pipeline.getOptions(); + + JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline"); + JavaStreamingContext jssc = + new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000)); + TransformTranslator.Translator translator = new TransformTranslator.Translator(); + SparkNativePipelineVisitor visitor; + if (options.isStreaming() + || options instanceof TestSparkPipelineOptions + && ((TestSparkPipelineOptions) options).isForceStreaming()) { + SparkPipelineTranslator streamingTranslator = + new StreamingTransformTranslator.Translator(translator); + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); + visitor = new SparkNativePipelineVisitor(streamingTranslator, ctxt); + } else { + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); + visitor = new SparkNativePipelineVisitor(translator, ctxt); + } + pipeline.traverseTopologically(visitor); + jsc.stop(); + String debugString = visitor.getDebugString(); + LOG.info("Translated Native Spark pipeline:\n" + debugString); + return new DebugSparkPipelineResult(debugString); + } + + /** + * PipelineResult of running a {@link Pipeline} using {@link SparkRunnerDebugger} + * Use {@link #getDebugString} to get a {@link String} representation of the {@link Pipeline} + * translated into Spark native operations. + */ + public static class DebugSparkPipelineResult extends SparkPipelineResult { + private final String debugString; + + DebugSparkPipelineResult(String debugString) { + super(null, null); + this.debugString = debugString; + } + + /** + * Returns Beam pipeline translated into Spark native operations. + */ + String getDebugString() { + return debugString; + } + + @Override protected void stop() { + // Empty implementation + } + + @Override protected State awaitTermination(Duration duration) + throws TimeoutException, ExecutionException, InterruptedException { + return State.DONE; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java index fbfa84d..585b933 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java @@ -26,4 +26,5 @@ import org.apache.beam.sdk.transforms.PTransform; */ public interface TransformEvaluator<TransformT extends PTransform<?, ?>> extends Serializable { void evaluate(TransformT transform, EvaluationContext context); + String toNativeString(); } http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 725d157..44b4039 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -121,6 +121,11 @@ public final class TransformTranslator { } context.putDataset(transform, new BoundedDataset<>(unionRDD)); } + + @Override + public String toNativeString() { + return "sparkContext.union(...)"; + } }; } @@ -162,6 +167,11 @@ public final class TransformTranslator { context.putDataset(transform, new BoundedDataset<>(groupedAlsoByWindow)); } + + @Override + public String toNativeString() { + return "groupByKey()"; + } }; } @@ -201,6 +211,11 @@ public final class TransformTranslator { }); context.putDataset(transform, new BoundedDataset<>(outRDD)); } + + @Override + public String toNativeString() { + return "map(new <fn>())"; + } }; } @@ -267,6 +282,11 @@ public final class TransformTranslator { } context.putDataset(transform, new BoundedDataset<>(outRdd)); } + + @Override + public String toNativeString () { + return "aggregate(..., new <fn>(), ...)"; + } }; } @@ -321,6 +341,11 @@ public final class TransformTranslator { context.putDataset(transform, new BoundedDataset<>(outRdd)); } + + @Override + public String toNativeString() { + return "combineByKey(..., new <fn>(), ...)"; + } }; } @@ -348,6 +373,11 @@ public final class TransformTranslator { new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(aggAccum, metricsAccum, stepName, doFn, context.getRuntimeContext(), sideInputs, windowingStrategy)))); } + + @Override + public String toNativeString() { + return "mapPartitions(new <fn>())"; + } }; } @@ -388,6 +418,11 @@ public final class TransformTranslator { context.putDataset(e.getValue(), new BoundedDataset<>(values)); } } + + @Override + public String toNativeString() { + return "mapPartitions(new <fn>())"; + } }; } @@ -401,6 +436,11 @@ public final class TransformTranslator { .map(WindowingHelpers.<String>windowFunction()); context.putDataset(transform, new BoundedDataset<>(rdd)); } + + @Override + public String toNativeString() { + return "sparkContext.textFile(...)"; + } }; } @@ -426,6 +466,11 @@ public final class TransformTranslator { writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, NullWritable.class, TemplatedTextOutputFormat.class); } + + @Override + public String toNativeString() { + return "saveAsNewAPIHadoopFile(...)"; + } }; } @@ -450,6 +495,11 @@ public final class TransformTranslator { }).map(WindowingHelpers.<T>windowFunction()); context.putDataset(transform, new BoundedDataset<>(rdd)); } + + @Override + public String toNativeString() { + return "sparkContext.newAPIHadoopFile(...)"; + } }; } @@ -481,6 +531,11 @@ public final class TransformTranslator { writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo, AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class); } + + @Override + public String toNativeString() { + return "mapToPair(<objectToAvroKeyFn>).saveAsNewAPIHadoopFile(...)"; + } }; } @@ -496,6 +551,11 @@ public final class TransformTranslator { // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation. context.putDataset(transform, new BoundedDataset<>(input.cache())); } + + @Override + public String toNativeString() { + return "sparkContext.<readFrom(<source>)>()"; + } }; } @@ -519,6 +579,11 @@ public final class TransformTranslator { }).map(WindowingHelpers.<KV<K, V>>windowFunction()); context.putDataset(transform, new BoundedDataset<>(rdd)); } + + @Override + public String toNativeString() { + return "sparkContext.newAPIHadoopFile(...)"; + } }; } @@ -547,6 +612,11 @@ public final class TransformTranslator { writeHadoopFile(last, conf, shardTemplateInfo, transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass()); } + + @Override + public String toNativeString() { + return "saveAsNewAPIHadoopFile(...)"; + } }; } @@ -619,6 +689,11 @@ public final class TransformTranslator { inRDD.map(new SparkAssignWindowFn<>(transform.getWindowFn())))); } } + + @Override + public String toNativeString() { + return "map(new <windowFn>())"; + } }; } @@ -632,6 +707,11 @@ public final class TransformTranslator { Coder<T> coder = context.getOutput(transform).getCoder(); context.putBoundedDatasetFromValues(transform, elems, coder); } + + @Override + public String toNativeString() { + return "sparkContext.parallelize(Arrays.asList(...))"; + } }; } @@ -649,6 +729,11 @@ public final class TransformTranslator { context.putPView(output, iterCast, coderInternal); } + + @Override + public String toNativeString() { + return "collect()"; + } }; } @@ -666,6 +751,11 @@ public final class TransformTranslator { context.putPView(output, iterCast, coderInternal); } + + @Override + public String toNativeString() { + return "collect()"; + } }; } @@ -685,6 +775,11 @@ public final class TransformTranslator { context.putPView(output, iterCast, coderInternal); } + + @Override + public String toNativeString() { + return "<createPCollectionView>"; + } }; } @@ -706,6 +801,11 @@ public final class TransformTranslator { context.putDataset(transform, new BoundedDataset<String>(output)); } + + @Override + public String toNativeString() { + return "sparkContext.parallelize(rdd.getStorageLevel().description())"; + } }; } @@ -732,6 +832,11 @@ public final class TransformTranslator { context.putDataset(transform, new BoundedDataset<>(reshuffled)); } + + @Override + public String toNativeString() { + return "repartition(...)"; + } }; } http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index ccf84b2..8a05fbb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -96,7 +96,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * Supports translation between a Beam transform, and Spark's operations on DStreams. */ -final class StreamingTransformTranslator { +public final class StreamingTransformTranslator { private StreamingTransformTranslator() { } @@ -110,6 +110,11 @@ final class StreamingTransformTranslator { ((UnboundedDataset<T>) (context).borrowDataset(transform)).getDStream(); dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum()); } + + @Override + public String toNativeString() { + return ".print(...)"; + } }; } @@ -124,6 +129,11 @@ final class StreamingTransformTranslator { context.getRuntimeContext(), transform.getSource())); } + + @Override + public String toNativeString() { + return "streamingContext.<readFrom(<source>)>()"; + } }; } @@ -168,6 +178,11 @@ final class StreamingTransformTranslator { ImmutableMap.of(unboundedDataset.getStreamSources().get(0), times)); context.putDataset(transform, unboundedDataset); } + + @Override + public String toNativeString() { + return "streamingContext.queueStream(...)"; + } }; } @@ -208,6 +223,11 @@ final class StreamingTransformTranslator { context.getStreamingContext().union(dStreams.remove(0), dStreams); context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, streamingSources)); } + + @Override + public String toNativeString() { + return "streamingContext.union(...)"; + } }; } @@ -235,6 +255,11 @@ final class StreamingTransformTranslator { context.putDataset(transform, new UnboundedDataset<>(outputStream, unboundedDataset.getStreamSources())); } + + @Override + public String toNativeString() { + return "map(new <windowFn>())"; + } }; } @@ -283,6 +308,11 @@ final class StreamingTransformTranslator { context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources)); } + + @Override + public String toNativeString() { + return "groupByKey()"; + } }; } @@ -329,6 +359,11 @@ final class StreamingTransformTranslator { context.putDataset(transform, new UnboundedDataset<>(outStream, unboundedDataset.getStreamSources())); } + + @Override + public String toNativeString() { + return "map(new <fn>())"; + } }; } @@ -375,6 +410,11 @@ final class StreamingTransformTranslator { context.putDataset(transform, new UnboundedDataset<>(outStream, unboundedDataset.getStreamSources())); } + + @Override + public String toNativeString() { + return "mapPartitions(new <fn>())"; + } }; } @@ -431,6 +471,11 @@ final class StreamingTransformTranslator { new UnboundedDataset<>(values, unboundedDataset.getStreamSources())); } } + + @Override + public String toNativeString() { + return "mapPartitions(new <fn>())"; + } }; } @@ -465,6 +510,10 @@ final class StreamingTransformTranslator { context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources)); } + + @Override public String toNativeString() { + return "repartition(...)"; + } }; } @@ -491,7 +540,7 @@ final class StreamingTransformTranslator { private final SparkPipelineTranslator batchTranslator; - Translator(SparkPipelineTranslator batchTranslator) { + public Translator(SparkPipelineTranslator batchTranslator) { this.batchTranslator = batchTranslator; } http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java new file mode 100644 index 0000000..905b30e --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import static org.junit.Assert.assertThat; + +import java.util.Collections; +import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Distinct; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; + + +/** + * Test {@link SparkRunnerDebugger} with different pipelines. + */ +public class SparkRunnerDebuggerTest { + + @Rule + public final PipelineRule batchPipelineRule = PipelineRule.batch(); + + @Rule + public final PipelineRule streamingPipelineRule = PipelineRule.streaming(); + + @Test + public void debugBatchPipeline() { + TestSparkPipelineOptions options = batchPipelineRule.getOptions(); + options.setRunner(SparkRunnerDebugger.class); + + Pipeline pipeline = Pipeline.create(options); + + PCollection<String> lines = pipeline + .apply(Create.of(Collections.<String>emptyList()).withCoder(StringUtf8Coder.of())); + + PCollection<KV<String, Long>> wordCounts = lines + .apply(new WordCount.CountWords()); + + wordCounts + .apply(GroupByKey.<String, Long>create()) + .apply(Combine.<String, Long, Long>groupedValues(Sum.ofLongs())); + + PCollection<KV<String, Long>> wordCountsPlusOne = wordCounts + .apply(MapElements.via(new PlusOne())); + + PCollectionList.of(wordCounts).and(wordCountsPlusOne) + .apply(Flatten.<KV<String, Long>>pCollections()); + + wordCounts + .apply(MapElements.via(new WordCount.FormatAsTextFn())) + .apply(TextIO.Write.to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt")); + + final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n" + + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n" + + "_.combineByKey(..., new org.apache.beam.sdk.transforms" + + ".Combine$CombineFn$KeyIgnoringCombineFn(), ...)\n" + + "_.groupByKey()\n" + + "_.map(new org.apache.beam.sdk.transforms.Combine$CombineFn$KeyIgnoringCombineFn())\n" + + "_.mapPartitions(new org.apache.beam.runners.spark" + + ".SparkRunnerDebuggerTest$PlusOne())\n" + + "sparkContext.union(...)\n" + + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n" + + "_.<org.apache.beam.sdk.io.TextIO$Write$Bound>"; + + SparkRunnerDebugger.DebugSparkPipelineResult result = + (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run(); + + assertThat("Debug pipeline did not equal expected", result.getDebugString(), + Matchers.equalTo(expectedPipeline)); + } + + @Test + public void debugStreamingPipeline() { + TestSparkPipelineOptions options = streamingPipelineRule.getOptions(); + options.setRunner(SparkRunnerDebugger.class); + + Pipeline pipeline = Pipeline.create(options); + + KafkaIO.Read<String, String> read = KafkaIO.<String, String>read() + .withBootstrapServers("mykafka:9092") + .withTopics(Collections.singletonList("my_input_topic")) + .withKeyCoder(StringUtf8Coder.of()) + .withValueCoder(StringUtf8Coder.of()); + + KafkaIO.Write<String, String> write = KafkaIO.<String, String>write() + .withBootstrapServers("myotherkafka:9092") + .withTopic("my_output_topic") + .withKeyCoder(StringUtf8Coder.of()) + .withValueCoder(StringUtf8Coder.of()); + + KvCoder<String, String> stringKvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + pipeline + .apply(read.withoutMetadata()).setCoder(stringKvCoder) + .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(5)))) + .apply(ParDo.of(new SparkRunnerDebuggerTest.FormatKVFn())) + .apply(Distinct.<String>create()) + .apply(WithKeys.of(new SparkRunnerDebuggerTest.ArbitraryKeyFunction())) + .apply(write); + + final String expectedPipeline = "KafkaUtils.createDirectStream(...)\n" + + "_.map(new org.apache.beam.sdk.transforms.windowing.FixedWindows())\n" + + "_.mapPartitions(new org.apache.beam.runners.spark." + + "SparkRunnerDebuggerTest$FormatKVFn())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$2())\n" + + "_.groupByKey()\n" + + "_.map(new org.apache.beam.sdk.transforms.Combine$CombineFn$KeyIgnoringCombineFn())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Keys$1())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.WithKeys$2())\n" + + "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>"; + + SparkRunnerDebugger.DebugSparkPipelineResult result = + (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run(); + + assertThat("Debug pipeline did not equal expected", + result.getDebugString(), + Matchers.equalTo(expectedPipeline)); + } + + private static class FormatKVFn extends DoFn<KV<String, String>, String> { + @SuppressWarnings("unused") + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().getKey() + "," + c.element().getValue()); + } + } + + private static class ArbitraryKeyFunction implements SerializableFunction<String, String> { + @Override + public String apply(String input) { + return "someKey"; + } + } + + private static class PlusOne extends SimpleFunction<KV<String, Long>, KV<String, Long>> { + @Override + public KV<String, Long> apply(KV<String, Long> input) { + return KV.of(input.getKey(), input.getValue() + 1); + } + } +}