Repository: beam
Updated Branches:
  refs/heads/master 2c2424cb4 -> c12d432d8


[BEAM-797] A PipelineVisitor that creates a Spark-native pipeline.

[BEAM-797] Remove unnecessary temp dir from test


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94bef14e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94bef14e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94bef14e

Branch: refs/heads/master
Commit: 94bef14e50c66bd1c87bf33937f76d16ba798ee0
Parents: 2c2424c
Author: Aviem Zur <aviem...@gmail.com>
Authored: Sat Mar 4 23:16:24 2017 +0200
Committer: Amit Sela <amitsel...@gmail.com>
Committed: Fri Mar 10 15:14:25 2017 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |   5 +
 .../spark/SparkNativePipelineVisitor.java       | 202 +++++++++++++++++++
 .../apache/beam/runners/spark/SparkRunner.java  |  16 +-
 .../beam/runners/spark/SparkRunnerDebugger.java | 121 +++++++++++
 .../spark/translation/TransformEvaluator.java   |   1 +
 .../spark/translation/TransformTranslator.java  | 105 ++++++++++
 .../streaming/StreamingTransformTranslator.java |  53 ++++-
 .../runners/spark/SparkRunnerDebuggerTest.java  | 180 +++++++++++++++++
 8 files changed, 673 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ebd987d..a330820 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -196,6 +196,11 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>2.6</version>
+    </dependency>
+    <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
       <version>2.4</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
new file mode 100644
index 0000000..056da97
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
+import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.translation.TransformEvaluator;
+import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.commons.lang.WordUtils;
+
+
+/**
+ * Pipeline visitor for translating a Beam pipeline into equivalent Spark 
operations.
+ * Used for debugging purposes using {@link SparkRunnerDebugger}.
+ */
+public class SparkNativePipelineVisitor extends SparkRunner.Evaluator {
+  private final List<NativeTransform> transforms;
+  private final List<String> knownCompositesPackages =
+      Lists.newArrayList(
+          "org.apache.beam.sdk.transforms",
+          "org.apache.beam.runners.spark.examples");
+
+  SparkNativePipelineVisitor(SparkPipelineTranslator translator, 
EvaluationContext ctxt) {
+    super(translator, ctxt);
+    this.transforms = new ArrayList<>();
+    MetricsAccumulator.init(ctxt.getSparkContext(), 
Optional.<Checkpoint.CheckpointDir>absent());
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
+    CompositeBehavior compositeBehavior = super.enterCompositeTransform(node);
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null) {
+      @SuppressWarnings("unchecked")
+      final Class<PTransform<?, ?>> transformClass = (Class<PTransform<?, ?>>) 
transform.getClass();
+      if (compositeBehavior == CompositeBehavior.ENTER_TRANSFORM
+          && !knownComposite(transformClass)
+          && shouldDebug(node)) {
+        transforms.add(new NativeTransform(node, null, transform, true));
+      }
+    }
+    return compositeBehavior;
+  }
+
+  private boolean knownComposite(Class<PTransform<?, ?>> transform) {
+    String transformPackage = transform.getPackage().getName();
+    for (String knownCompositePackage : knownCompositesPackages) {
+      if (transformPackage.startsWith(knownCompositePackage)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean shouldDebug(final TransformHierarchy.Node node) {
+    return node == null || !Iterables.any(transforms, new 
Predicate<NativeTransform>() {
+      @Override
+      public boolean apply(NativeTransform debugTransform) {
+        return debugTransform.getNode().equals(node) && 
debugTransform.isComposite();
+      }
+    }) && shouldDebug(node.getEnclosingNode());
+  }
+
+  @Override
+  <TransformT extends PTransform<? super PInput, POutput>> void
+  doVisitTransform(TransformHierarchy.Node node) {
+    super.doVisitTransform(node);
+    @SuppressWarnings("unchecked")
+    TransformT transform = (TransformT) node.getTransform();
+    @SuppressWarnings("unchecked")
+    Class<TransformT> transformClass = (Class<TransformT>) 
transform.getClass();
+    @SuppressWarnings("unchecked")
+    TransformEvaluator<TransformT> evaluator = translate(node, transform, 
transformClass);
+    if (shouldDebug(node)) {
+      transforms.add(new NativeTransform(node, evaluator, transform, false));
+    }
+  }
+
+  String getDebugString() {
+    return Joiner.on("\n").join(transforms);
+  }
+
+  private static class NativeTransform {
+    private final TransformHierarchy.Node node;
+    private final TransformEvaluator<?> transformEvaluator;
+    private final PTransform<?, ?> transform;
+    private final boolean composite;
+
+    NativeTransform(
+        TransformHierarchy.Node node,
+        TransformEvaluator<?> transformEvaluator,
+        PTransform<?, ?> transform,
+        boolean composite) {
+      this.node = node;
+      this.transformEvaluator = transformEvaluator;
+      this.transform = transform;
+      this.composite = composite;
+    }
+
+    TransformHierarchy.Node getNode() {
+      return node;
+    }
+
+    boolean isComposite() {
+      return composite;
+    }
+
+    @Override
+    public String toString() {
+      try {
+        Class<? extends PTransform> transformClass = transform.getClass();
+        if (node.getFullName().equals("KafkaIO.Read")) {
+          return "KafkaUtils.createDirectStream(...)";
+        }
+        if (composite) {
+          return "_.<" + transformClass.getName() + ">";
+        }
+        String transformString = transformEvaluator.toNativeString();
+        if (transformString.contains("<fn>")) {
+          transformString = replaceFnString(transformClass, transformString, 
"fn");
+        } else if (transformString.contains("<windowFn>")) {
+          transformString = replaceFnString(transformClass, transformString, 
"windowFn");
+        } else if (transformString.contains("<source>")) {
+          String sourceName = "...";
+          if (transform instanceof Read.Bounded) {
+            sourceName = ((Read.Bounded<?>) 
transform).getSource().getClass().getName();
+          } else if (transform instanceof Read.Unbounded) {
+            sourceName = ((Read.Unbounded<?>) 
transform).getSource().getClass().getName();
+          }
+          transformString = transformString.replace("<source>", sourceName);
+        }
+        if (transformString.startsWith("sparkContext")
+            || transformString.startsWith("streamingContext")) {
+          return transformString;
+        }
+        return "_." + transformString;
+      } catch (
+          NoSuchMethodException
+              | InvocationTargetException
+              | IllegalAccessException
+              | NoSuchFieldException e) {
+        return "<FailedTranslation>";
+      }
+    }
+
+    private String replaceFnString(
+        Class<? extends PTransform> transformClass,
+        String transformString,
+        String fnFieldName)
+        throws IllegalAccessException, InvocationTargetException, 
NoSuchMethodException,
+        NoSuchFieldException {
+      Object fn =
+          transformClass.getMethod("get" + 
WordUtils.capitalize(fnFieldName)).invoke(transform);
+      Class<?> fnClass = fn.getClass();
+      String doFnName;
+      Class<?> enclosingClass = fnClass.getEnclosingClass();
+      if (enclosingClass != null && enclosingClass.equals(MapElements.class)) {
+        Field parent = fnClass.getDeclaredField("this$0");
+        parent.setAccessible(true);
+        Field fnField = enclosingClass.getDeclaredField(fnFieldName);
+        fnField.setAccessible(true);
+        doFnName = fnField.get(parent.get(fn)).getClass().getName();
+      } else {
+        doFnName = fnClass.getName();
+      }
+      transformString = transformString.replace("<" + fnFieldName + ">", 
doFnName);
+      return transformString;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 3f002da..a706f00 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -297,11 +297,12 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
   /**
    * Evaluator on the pipeline.
    */
+  @SuppressWarnings("WeakerAccess")
   public static class Evaluator extends Pipeline.PipelineVisitor.Defaults {
     private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
 
-    private final EvaluationContext ctxt;
-    private final SparkPipelineTranslator translator;
+    protected final EvaluationContext ctxt;
+    protected final SparkPipelineTranslator translator;
 
     public Evaluator(SparkPipelineTranslator translator, EvaluationContext 
ctxt) {
       this.translator = translator;
@@ -324,7 +325,7 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
       return CompositeBehavior.ENTER_TRANSFORM;
     }
 
-    private boolean shouldDefer(TransformHierarchy.Node node) {
+    protected boolean shouldDefer(TransformHierarchy.Node node) {
       // if the input is not a PCollection, or it is but with non merging 
windows, don't defer.
       if (node.getInputs().size() != 1) {
         return false;
@@ -361,7 +362,7 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
     }
 
     <TransformT extends PTransform<? super PInput, POutput>> void
-        doVisitTransform(TransformHierarchy.Node node) {
+    doVisitTransform(TransformHierarchy.Node node) {
       @SuppressWarnings("unchecked")
       TransformT transform = (TransformT) node.getTransform();
       @SuppressWarnings("unchecked")
@@ -379,8 +380,8 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
      * Determine if this Node belongs to a Bounded branch of the pipeline, or 
Unbounded, and
      * translate with the proper translator.
      */
-    private <TransformT extends PTransform<? super PInput, POutput>>
-        TransformEvaluator<TransformT> translate(
+    protected <TransformT extends PTransform<? super PInput, POutput>>
+    TransformEvaluator<TransformT> translate(
             TransformHierarchy.Node node, TransformT transform, 
Class<TransformT> transformClass) {
       //--- determine if node is bounded/unbounded.
       // usually, the input determines if the PCollection to apply the next 
transformation to
@@ -400,7 +401,7 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
               : translator.translateUnbounded(transformClass);
     }
 
-    private PCollection.IsBounded isBoundedCollection(Collection<TaggedPValue> 
pValues) {
+    protected PCollection.IsBounded 
isBoundedCollection(Collection<TaggedPValue> pValues) {
       // anything that is not a PCollection, is BOUNDED.
       // For PCollections:
       // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED
@@ -417,4 +418,3 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
new file mode 100644
index 0000000..395acff
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.translation.TransformTranslator;
+import 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Pipeline runner which translates a Beam pipeline into equivalent Spark 
operations, without
+ * running them. Used for debugging purposes.
+ *
+ * <p>Example:</p>
+ *
+ * <pre>{@code
+ * SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ * options.setRunner(SparkRunnerDebugger.class);
+ * Pipeline pipeline = Pipeline.create(options);
+ * SparkRunnerDebugger.DebugSparkPipelineResult result =
+ *     (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();
+ * String sparkPipeline = result.getDebugString();
+ * }</pre>
+ */
+public final class SparkRunnerDebugger extends 
PipelineRunner<SparkPipelineResult> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkRunnerDebugger.class);
+
+  private SparkRunnerDebugger() {}
+
+  @SuppressWarnings("unused")
+  public static SparkRunnerDebugger fromOptions(PipelineOptions options) {
+    return new SparkRunnerDebugger();
+  }
+
+  @Override
+  public SparkPipelineResult run(Pipeline pipeline) {
+    SparkPipelineResult result;
+
+    SparkPipelineOptions options = (SparkPipelineOptions) 
pipeline.getOptions();
+
+    JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline");
+    JavaStreamingContext jssc =
+        new JavaStreamingContext(jsc, new 
org.apache.spark.streaming.Duration(1000));
+    TransformTranslator.Translator translator = new 
TransformTranslator.Translator();
+    SparkNativePipelineVisitor visitor;
+    if (options.isStreaming()
+        || options instanceof TestSparkPipelineOptions
+        && ((TestSparkPipelineOptions) options).isForceStreaming()) {
+      SparkPipelineTranslator streamingTranslator =
+          new StreamingTransformTranslator.Translator(translator);
+      EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
+      visitor = new SparkNativePipelineVisitor(streamingTranslator, ctxt);
+    } else {
+      EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
+      visitor = new SparkNativePipelineVisitor(translator, ctxt);
+    }
+    pipeline.traverseTopologically(visitor);
+    jsc.stop();
+    String debugString = visitor.getDebugString();
+    LOG.info("Translated Native Spark pipeline:\n" + debugString);
+    return new DebugSparkPipelineResult(debugString);
+  }
+
+  /**
+   * PipelineResult of running a {@link Pipeline} using {@link 
SparkRunnerDebugger}
+   * Use {@link #getDebugString} to get a {@link String} representation of the 
{@link Pipeline}
+   * translated into Spark native operations.
+   */
+  public static class DebugSparkPipelineResult extends SparkPipelineResult {
+    private final String debugString;
+
+    DebugSparkPipelineResult(String debugString) {
+      super(null, null);
+      this.debugString = debugString;
+    }
+
+    /**
+     * Returns Beam pipeline translated into Spark native operations.
+     */
+    String getDebugString() {
+      return debugString;
+    }
+
+    @Override protected void stop() {
+      // Empty implementation
+    }
+
+    @Override protected State awaitTermination(Duration duration)
+        throws TimeoutException, ExecutionException, InterruptedException {
+      return State.DONE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
index fbfa84d..585b933 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
@@ -26,4 +26,5 @@ import org.apache.beam.sdk.transforms.PTransform;
  */
 public interface TransformEvaluator<TransformT extends PTransform<?, ?>> 
extends Serializable {
   void evaluate(TransformT transform, EvaluationContext context);
+  String toNativeString();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 725d157..44b4039 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -121,6 +121,11 @@ public final class TransformTranslator {
         }
         context.putDataset(transform, new BoundedDataset<>(unionRDD));
       }
+
+      @Override
+      public String toNativeString() {
+        return "sparkContext.union(...)";
+      }
     };
   }
 
@@ -162,6 +167,11 @@ public final class TransformTranslator {
 
         context.putDataset(transform, new 
BoundedDataset<>(groupedAlsoByWindow));
       }
+
+      @Override
+      public String toNativeString() {
+        return "groupByKey()";
+      }
     };
   }
 
@@ -201,6 +211,11 @@ public final class TransformTranslator {
                        });
                context.putDataset(transform, new BoundedDataset<>(outRDD));
             }
+
+            @Override
+            public String toNativeString() {
+              return "map(new <fn>())";
+            }
           };
   }
 
@@ -267,6 +282,11 @@ public final class TransformTranslator {
             }
             context.putDataset(transform, new BoundedDataset<>(outRdd));
           }
+
+          @Override
+          public String toNativeString () {
+            return "aggregate(..., new <fn>(), ...)";
+          }
         };
   }
 
@@ -321,6 +341,11 @@ public final class TransformTranslator {
 
         context.putDataset(transform, new BoundedDataset<>(outRdd));
       }
+
+      @Override
+      public String toNativeString() {
+        return "combineByKey(..., new <fn>(), ...)";
+      }
     };
   }
 
@@ -348,6 +373,11 @@ public final class TransformTranslator {
             new BoundedDataset<>(inRDD.mapPartitions(new 
DoFnFunction<>(aggAccum, metricsAccum,
                 stepName, doFn, context.getRuntimeContext(), sideInputs, 
windowingStrategy))));
       }
+
+      @Override
+      public String toNativeString() {
+        return "mapPartitions(new <fn>())";
+      }
     };
   }
 
@@ -388,6 +418,11 @@ public final class TransformTranslator {
           context.putDataset(e.getValue(), new BoundedDataset<>(values));
         }
       }
+
+      @Override
+      public String toNativeString() {
+        return "mapPartitions(new <fn>())";
+      }
     };
   }
 
@@ -401,6 +436,11 @@ public final class TransformTranslator {
             .map(WindowingHelpers.<String>windowFunction());
         context.putDataset(transform, new BoundedDataset<>(rdd));
       }
+
+      @Override
+      public String toNativeString() {
+        return "sparkContext.textFile(...)";
+      }
     };
   }
 
@@ -426,6 +466,11 @@ public final class TransformTranslator {
         writeHadoopFile(last, new Configuration(), shardTemplateInfo, 
Text.class,
             NullWritable.class, TemplatedTextOutputFormat.class);
       }
+
+      @Override
+      public String toNativeString() {
+        return "saveAsNewAPIHadoopFile(...)";
+      }
     };
   }
 
@@ -450,6 +495,11 @@ public final class TransformTranslator {
             }).map(WindowingHelpers.<T>windowFunction());
         context.putDataset(transform, new BoundedDataset<>(rdd));
       }
+
+      @Override
+      public String toNativeString() {
+        return "sparkContext.newAPIHadoopFile(...)";
+      }
     };
   }
 
@@ -481,6 +531,11 @@ public final class TransformTranslator {
         writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo,
             AvroKey.class, NullWritable.class, 
TemplatedAvroKeyOutputFormat.class);
       }
+
+      @Override
+      public String toNativeString() {
+        return "mapToPair(<objectToAvroKeyFn>).saveAsNewAPIHadoopFile(...)";
+      }
     };
   }
 
@@ -496,6 +551,11 @@ public final class TransformTranslator {
         // cache to avoid re-evaluation of the source by Spark's lazy DAG 
evaluation.
         context.putDataset(transform, new BoundedDataset<>(input.cache()));
       }
+
+      @Override
+      public String toNativeString() {
+        return "sparkContext.<readFrom(<source>)>()";
+      }
     };
   }
 
@@ -519,6 +579,11 @@ public final class TransformTranslator {
         }).map(WindowingHelpers.<KV<K, V>>windowFunction());
         context.putDataset(transform, new BoundedDataset<>(rdd));
       }
+
+      @Override
+      public String toNativeString() {
+        return "sparkContext.newAPIHadoopFile(...)";
+      }
     };
   }
 
@@ -547,6 +612,11 @@ public final class TransformTranslator {
         writeHadoopFile(last, conf, shardTemplateInfo,
             transform.getKeyClass(), transform.getValueClass(), 
transform.getFormatClass());
       }
+
+      @Override
+      public String toNativeString() {
+        return "saveAsNewAPIHadoopFile(...)";
+      }
     };
   }
 
@@ -619,6 +689,11 @@ public final class TransformTranslator {
               inRDD.map(new SparkAssignWindowFn<>(transform.getWindowFn()))));
         }
       }
+
+      @Override
+      public String toNativeString() {
+        return "map(new <windowFn>())";
+      }
     };
   }
 
@@ -632,6 +707,11 @@ public final class TransformTranslator {
         Coder<T> coder = context.getOutput(transform).getCoder();
         context.putBoundedDatasetFromValues(transform, elems, coder);
       }
+
+      @Override
+      public String toNativeString() {
+        return "sparkContext.parallelize(Arrays.asList(...))";
+      }
     };
   }
 
@@ -649,6 +729,11 @@ public final class TransformTranslator {
 
         context.putPView(output, iterCast, coderInternal);
       }
+
+      @Override
+      public String toNativeString() {
+        return "collect()";
+      }
     };
   }
 
@@ -666,6 +751,11 @@ public final class TransformTranslator {
 
         context.putPView(output, iterCast, coderInternal);
       }
+
+      @Override
+      public String toNativeString() {
+        return "collect()";
+      }
     };
   }
 
@@ -685,6 +775,11 @@ public final class TransformTranslator {
 
         context.putPView(output, iterCast, coderInternal);
       }
+
+      @Override
+      public String toNativeString() {
+        return "<createPCollectionView>";
+      }
     };
   }
 
@@ -706,6 +801,11 @@ public final class TransformTranslator {
 
         context.putDataset(transform, new BoundedDataset<String>(output));
       }
+
+      @Override
+      public String toNativeString() {
+        return "sparkContext.parallelize(rdd.getStorageLevel().description())";
+      }
     };
   }
 
@@ -732,6 +832,11 @@ public final class TransformTranslator {
 
         context.putDataset(transform, new BoundedDataset<>(reshuffled));
       }
+
+      @Override
+      public String toNativeString() {
+        return "repartition(...)";
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index ccf84b2..8a05fbb 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -96,7 +96,7 @@ import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
 /**
  * Supports translation between a Beam transform, and Spark's operations on 
DStreams.
  */
-final class StreamingTransformTranslator {
+public final class StreamingTransformTranslator {
 
   private StreamingTransformTranslator() {
   }
@@ -110,6 +110,11 @@ final class StreamingTransformTranslator {
             ((UnboundedDataset<T>) 
(context).borrowDataset(transform)).getDStream();
         
dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
       }
+
+      @Override
+      public String toNativeString() {
+        return ".print(...)";
+      }
     };
   }
 
@@ -124,6 +129,11 @@ final class StreamingTransformTranslator {
                 context.getRuntimeContext(),
                 transform.getSource()));
       }
+
+      @Override
+      public String toNativeString() {
+        return "streamingContext.<readFrom(<source>)>()";
+      }
     };
   }
 
@@ -168,6 +178,11 @@ final class StreamingTransformTranslator {
             ImmutableMap.of(unboundedDataset.getStreamSources().get(0), 
times));
         context.putDataset(transform, unboundedDataset);
       }
+
+      @Override
+      public String toNativeString() {
+        return "streamingContext.queueStream(...)";
+      }
     };
   }
 
@@ -208,6 +223,11 @@ final class StreamingTransformTranslator {
             context.getStreamingContext().union(dStreams.remove(0), dStreams);
         context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, 
streamingSources));
       }
+
+      @Override
+      public String toNativeString() {
+        return "streamingContext.union(...)";
+      }
     };
   }
 
@@ -235,6 +255,11 @@ final class StreamingTransformTranslator {
         context.putDataset(transform,
             new UnboundedDataset<>(outputStream, 
unboundedDataset.getStreamSources()));
       }
+
+      @Override
+      public String toNativeString() {
+        return "map(new <windowFn>())";
+      }
     };
   }
 
@@ -283,6 +308,11 @@ final class StreamingTransformTranslator {
 
         context.putDataset(transform, new UnboundedDataset<>(outStream, 
streamSources));
       }
+
+      @Override
+      public String toNativeString() {
+        return "groupByKey()";
+      }
     };
   }
 
@@ -329,6 +359,11 @@ final class StreamingTransformTranslator {
         context.putDataset(transform,
             new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamSources()));
       }
+
+      @Override
+      public String toNativeString() {
+        return "map(new <fn>())";
+      }
     };
   }
 
@@ -375,6 +410,11 @@ final class StreamingTransformTranslator {
         context.putDataset(transform,
             new UnboundedDataset<>(outStream, 
unboundedDataset.getStreamSources()));
       }
+
+      @Override
+      public String toNativeString() {
+        return "mapPartitions(new <fn>())";
+      }
     };
   }
 
@@ -431,6 +471,11 @@ final class StreamingTransformTranslator {
               new UnboundedDataset<>(values, 
unboundedDataset.getStreamSources()));
         }
       }
+
+      @Override
+      public String toNativeString() {
+        return "mapPartitions(new <fn>())";
+      }
     };
   }
 
@@ -465,6 +510,10 @@ final class StreamingTransformTranslator {
 
         context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, 
streamSources));
       }
+
+      @Override public String toNativeString() {
+        return "repartition(...)";
+      }
     };
   }
 
@@ -491,7 +540,7 @@ final class StreamingTransformTranslator {
 
     private final SparkPipelineTranslator batchTranslator;
 
-    Translator(SparkPipelineTranslator batchTranslator) {
+    public Translator(SparkPipelineTranslator batchTranslator) {
       this.batchTranslator = batchTranslator;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/94bef14e/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
new file mode 100644
index 0000000..905b30e
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+
+
+/**
+ * Test {@link SparkRunnerDebugger} with different pipelines.
+ */
+public class SparkRunnerDebuggerTest {
+
+  @Rule
+  public final PipelineRule batchPipelineRule = PipelineRule.batch();
+
+  @Rule
+  public final PipelineRule streamingPipelineRule = PipelineRule.streaming();
+
+  @Test
+  public void debugBatchPipeline() {
+    TestSparkPipelineOptions options = batchPipelineRule.getOptions();
+    options.setRunner(SparkRunnerDebugger.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<String> lines = pipeline
+        
.apply(Create.of(Collections.<String>emptyList()).withCoder(StringUtf8Coder.of()));
+
+    PCollection<KV<String, Long>> wordCounts = lines
+        .apply(new WordCount.CountWords());
+
+    wordCounts
+        .apply(GroupByKey.<String, Long>create())
+        .apply(Combine.<String, Long, Long>groupedValues(Sum.ofLongs()));
+
+    PCollection<KV<String, Long>> wordCountsPlusOne = wordCounts
+        .apply(MapElements.via(new PlusOne()));
+
+    PCollectionList.of(wordCounts).and(wordCountsPlusOne)
+        .apply(Flatten.<KV<String, Long>>pCollections());
+
+    wordCounts
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+        
.apply(TextIO.Write.to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
+
+    final String expectedPipeline = 
"sparkContext.parallelize(Arrays.asList(...))\n"
+        + "_.mapPartitions(new 
org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
+        + "_.mapPartitions(new 
org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
+        + "_.combineByKey(..., new org.apache.beam.sdk.transforms"
+        + ".Combine$CombineFn$KeyIgnoringCombineFn(), ...)\n"
+        + "_.groupByKey()\n"
+        + "_.map(new 
org.apache.beam.sdk.transforms.Combine$CombineFn$KeyIgnoringCombineFn())\n"
+        + "_.mapPartitions(new org.apache.beam.runners.spark"
+        + ".SparkRunnerDebuggerTest$PlusOne())\n"
+        + "sparkContext.union(...)\n"
+        + "_.mapPartitions(new 
org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
+        + "_.<org.apache.beam.sdk.io.TextIO$Write$Bound>";
+
+    SparkRunnerDebugger.DebugSparkPipelineResult result =
+        (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();
+
+    assertThat("Debug pipeline did not equal expected", 
result.getDebugString(),
+        Matchers.equalTo(expectedPipeline));
+  }
+
+  @Test
+  public void debugStreamingPipeline() {
+    TestSparkPipelineOptions options = streamingPipelineRule.getOptions();
+    options.setRunner(SparkRunnerDebugger.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
+        .withBootstrapServers("mykafka:9092")
+        .withTopics(Collections.singletonList("my_input_topic"))
+        .withKeyCoder(StringUtf8Coder.of())
+        .withValueCoder(StringUtf8Coder.of());
+
+    KafkaIO.Write<String, String> write = KafkaIO.<String, String>write()
+        .withBootstrapServers("myotherkafka:9092")
+        .withTopic("my_output_topic")
+        .withKeyCoder(StringUtf8Coder.of())
+        .withValueCoder(StringUtf8Coder.of());
+
+    KvCoder<String, String> stringKvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
+
+    pipeline
+        .apply(read.withoutMetadata()).setCoder(stringKvCoder)
+        .apply(Window.<KV<String, 
String>>into(FixedWindows.of(Duration.standardSeconds(5))))
+        .apply(ParDo.of(new SparkRunnerDebuggerTest.FormatKVFn()))
+        .apply(Distinct.<String>create())
+        .apply(WithKeys.of(new SparkRunnerDebuggerTest.ArbitraryKeyFunction()))
+        .apply(write);
+
+    final String expectedPipeline = "KafkaUtils.createDirectStream(...)\n"
+        + "_.map(new 
org.apache.beam.sdk.transforms.windowing.FixedWindows())\n"
+        + "_.mapPartitions(new org.apache.beam.runners.spark."
+        + "SparkRunnerDebuggerTest$FormatKVFn())\n"
+        + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$2())\n"
+        + "_.groupByKey()\n"
+        + "_.map(new 
org.apache.beam.sdk.transforms.Combine$CombineFn$KeyIgnoringCombineFn())\n"
+        + "_.mapPartitions(new org.apache.beam.sdk.transforms.Keys$1())\n"
+        + "_.mapPartitions(new org.apache.beam.sdk.transforms.WithKeys$2())\n"
+        + "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>";
+
+    SparkRunnerDebugger.DebugSparkPipelineResult result =
+        (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();
+
+    assertThat("Debug pipeline did not equal expected",
+        result.getDebugString(),
+        Matchers.equalTo(expectedPipeline));
+  }
+
+  private static class FormatKVFn extends DoFn<KV<String, String>, String> {
+    @SuppressWarnings("unused")
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getKey() + "," + c.element().getValue());
+    }
+  }
+
+  private static class ArbitraryKeyFunction implements 
SerializableFunction<String, String> {
+    @Override
+    public String apply(String input) {
+      return "someKey";
+    }
+  }
+
+  private static class PlusOne extends SimpleFunction<KV<String, Long>, 
KV<String, Long>> {
+    @Override
+    public KV<String, Long> apply(KV<String, Long> input) {
+      return KV.of(input.getKey(), input.getValue() + 1);
+    }
+  }
+}

Reply via email to