Update to dataflow 0.4.150710.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c01421ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c01421ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c01421ce Branch: refs/heads/master Commit: c01421ce3375712bae0c0b88b5f2141c67284ade Parents: 3cae69b Author: Tom White <t...@cloudera.com> Authored: Mon Jul 13 21:08:57 2015 +0100 Committer: Tom White <t...@cloudera.com> Committed: Thu Mar 10 11:15:15 2016 +0000 ---------------------------------------------------------------------- runners/spark/pom.xml | 2 +- .../com/cloudera/dataflow/spark/SparkProcessContext.java | 8 ++++---- .../com/cloudera/dataflow/spark/TransformTranslator.java | 8 ++++---- .../test/java/com/cloudera/dataflow/spark/NumShardsTest.java | 4 +++- .../java/com/cloudera/dataflow/spark/SerializationTest.java | 4 ++-- 5 files changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 0382108..3bce8c0 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -23,7 +23,7 @@ License. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.7</java.version> <spark.version>1.3.1</spark.version> - <google-cloud-dataflow-version>0.4.150602</google-cloud-dataflow-version> + <google-cloud-dataflow-version>0.4.150710</google-cloud-dataflow-version> </properties> <build> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java index ee51c35..d0e9d6a 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java @@ -148,14 +148,14 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext { } @Override - public <T> void store(CodedTupleTag<T> tag, T value, Instant timestamp) - throws IOException { + public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException { throw new UnsupportedOperationException( - "WindowingInternals#store() is not yet supported."); + "WindowingInternals#writeToTagList() is not yet supported."); } @Override - public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException { + public <T> void writeToTagList(CodedTupleTag<T> tag, T value, Instant timestamp) + throws IOException { throw new UnsupportedOperationException( "WindowingInternals#writeToTagList() is not yet supported."); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java index e1af3cf..f137218 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java @@ -520,10 +520,10 @@ public final class TransformTranslator { }; } - private static <T> TransformEvaluator<Create<T>> create() { - return new TransformEvaluator<Create<T>>() { + private static <T> TransformEvaluator<Create.Values<T>> create() { + return new TransformEvaluator<Create.Values<T>>() { @Override - public void evaluate(Create<T> transform, EvaluationContext context) { + public void evaluate(Create.Values<T> transform, EvaluationContext context) { Iterable<T> elems = transform.getElements(); // Use a coder to convert the objects in the PCollection to byte arrays, so they // can be transferred over the network. @@ -624,7 +624,7 @@ public final class TransformTranslator { EVALUATORS.put(Combine.Globally.class, combineGlobally()); EVALUATORS.put(Combine.PerKey.class, combinePerKey()); EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl()); - EVALUATORS.put(Create.class, create()); + EVALUATORS.put(Create.Values.class, create()); EVALUATORS.put(View.AsSingleton.class, viewAsSingleton()); EVALUATORS.put(View.AsIterable.class, viewAsIter()); EVALUATORS.put(View.CreatePCollectionView.class, createPCollView()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java index 8985e66..9572b0f 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/NumShardsTest.java @@ -20,6 +20,7 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.Charsets; import com.google.common.collect.Sets; @@ -62,7 +63,8 @@ public class NumShardsTest { options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - PCollection<String> output = inputWords.apply(new WordCount.CountWords()); + PCollection<String> output = inputWords.apply(new WordCount.CountWords()) + .apply(ParDo.of(new WordCount.FormatAsTextFn())); output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c01421ce/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java index 40591e5..bd1a4e8 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/SerializationTest.java @@ -112,8 +112,8 @@ public class SerializationTest { SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); - PCollection<StringHolder> inputWords = p.apply(Create.of(WORDS)).setCoder - (StringHolderUtf8Coder.of()); + PCollection<StringHolder> inputWords = + p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of())); PCollection<StringHolder> output = inputWords.apply(new CountWords()); DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);