Update to dataflow 0.4.150727.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89945bf6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89945bf6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89945bf6 Branch: refs/heads/master Commit: 89945bf676affe2cd52fed91551cb1037fc2faae Parents: b83d666 Author: Tom White <t...@cloudera.com> Authored: Wed Aug 5 18:10:59 2015 +0100 Committer: Tom White <t...@cloudera.com> Committed: Thu Mar 10 11:15:15 2016 +0000 ---------------------------------------------------------------------- runners/spark/pom.xml | 2 +- .../dataflow/spark/SparkProcessContext.java | 53 ++++++-------------- .../dataflow/spark/TransformTranslator.java | 8 +-- .../com/cloudera/dataflow/spark/TfIdfTest.java | 2 +- .../dataflow/spark/TransformTranslatorTest.java | 5 ++ 5 files changed, 24 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index de9efb9..74b0fed 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -23,7 +23,7 @@ License. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.7</java.version> <spark.version>1.3.1</spark.version> - <google-cloud-dataflow-version>0.4.150710</google-cloud-dataflow-version> + <google-cloud-dataflow-version>0.4.150727</google-cloud-dataflow-version> </properties> <build> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java index bda838c..259f90c 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/SparkProcessContext.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.Map; import com.google.cloud.dataflow.sdk.coders.Coder; @@ -29,10 +28,11 @@ import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; -import com.google.cloud.dataflow.sdk.util.TimerManager; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.TimerInternals; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingInternals; -import com.google.cloud.dataflow.sdk.values.CodedTupleTag; +import com.google.cloud.dataflow.sdk.util.state.StateInternals; import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.TupleTag; import com.google.common.collect.AbstractIterator; @@ -126,6 +126,11 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext { } @Override + public PaneInfo pane() { + return PaneInfo.DEFAULT; + } + + @Override public WindowingInternals<I, O> windowingInternals() { return new WindowingInternals<I, O>() { @@ -136,53 +141,25 @@ abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext { @Override public void outputWindowedValue(O output, Instant timestamp, Collection<? - extends BoundedWindow> windows) { + extends BoundedWindow> windows, PaneInfo paneInfo) { output(output); } @Override - public KeyedState keyedState() { + public StateInternals stateInternals() { throw new UnsupportedOperationException( - "WindowingInternals#keyedState() is not yet supported."); - + "WindowingInternals#stateInternals() is not yet supported."); } @Override - public <T> void writeToTagList(CodedTupleTag<T> tag, T value) throws IOException { + public TimerInternals timerInternals() { throw new UnsupportedOperationException( - "WindowingInternals#writeToTagList() is not yet supported."); + "WindowingInternals#timerInternals() is not yet supported."); } @Override - public <T> void writeToTagList(CodedTupleTag<T> tag, T value, Instant timestamp) - throws IOException { - throw new UnsupportedOperationException( - "WindowingInternals#writeToTagList() is not yet supported."); - } - - @Override - public <T> void deleteTagList(CodedTupleTag<T> tag) { - throw new UnsupportedOperationException( - "WindowingInternals#deleteTagList() is not yet supported."); - } - - @Override - public <T> Iterable<T> readTagList(CodedTupleTag<T> tag) throws IOException { - throw new UnsupportedOperationException( - "WindowingInternals#readTagList() is not yet supported."); - } - - @Override - public <T> Map<CodedTupleTag<T>, Iterable<T>> readTagList(List<CodedTupleTag<T>> tags) - throws IOException { - throw new UnsupportedOperationException( - "WindowingInternals#readTagList() is not yet supported."); - } - - @Override - public TimerManager getTimerManager() { - throw new UnsupportedOperationException( - "WindowingInternals#getTimerManager() is not yet supported."); + public PaneInfo pane() { + return PaneInfo.DEFAULT; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java index 2c61a42..ee300fd 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java @@ -35,7 +35,6 @@ import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; @@ -511,11 +510,8 @@ public final class TransformTranslator { return new TransformEvaluator<Window.Bound<T>>() { @Override public void evaluate(Window.Bound<T> transform, EvaluationContext context) { - if (transform.getWindowingStrategy().getWindowFn() instanceof GlobalWindows) { - context.setOutputRDD(transform, context.getInputRDD(transform)); - } else { - throw new UnsupportedOperationException("Non-global windowing not supported"); - } + // TODO: detect and support non-global windows + context.setOutputRDD(transform, context.getInputRDD(transform)); } }; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java index 680d8b7..35ab26e 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TfIdfTest.java @@ -15,7 +15,7 @@ package com.cloudera.dataflow.spark; -import com.google.cloud.dataflow.examples.TfIdf; +import com.google.cloud.dataflow.examples.complete.TfIdf; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89945bf6/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java index 0251808..540bdd9 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/TransformTranslatorTest.java @@ -23,6 +23,7 @@ import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.Charsets; +import java.util.Collections; import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Before; @@ -76,6 +77,10 @@ public class TransformTranslatorTest { List<String> sparkOutput = Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8); + // sort output to get a stable result (PCollections are not ordered) + Collections.sort(directOutput); + Collections.sort(sparkOutput); + Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray()); }