[BEAM-79] Fix gearpump-runner merge conflicts and test failure
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3eab6a64 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3eab6a64 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3eab6a64 Branch: refs/heads/gearpump-runner Commit: 3eab6a647e4761725680c8bc40589dfa5569d75b Parents: 3f91798 Author: manuzhang <owenzhang1...@gmail.com> Authored: Tue Mar 14 08:09:46 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Wed Mar 15 15:21:29 2017 +0800 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 51 ++- .../gearpump/GearpumpPipelineResult.java | 21 +- .../gearpump/GearpumpPipelineTranslator.java | 388 ++++++++++++++++++- .../beam/runners/gearpump/GearpumpRunner.java | 376 +----------------- .../runners/gearpump/TestGearpumpRunner.java | 38 +- .../gearpump/examples/StreamingWordCount.java | 98 ----- .../gearpump/examples/UnboundedTextSource.java | 139 ------- .../runners/gearpump/examples/package-info.java | 22 -- ...CreateGearpumpPCollectionViewTranslator.java | 14 +- .../CreatePCollectionViewTranslator.java | 6 +- .../translators/CreateValuesTranslator.java | 51 --- .../FlattenPCollectionTranslator.java | 84 ---- .../FlattenPCollectionsTranslator.java | 83 ++++ .../translators/GroupByKeyTranslator.java | 4 +- .../translators/ParDoBoundMultiTranslator.java | 32 +- .../translators/ParDoBoundTranslator.java | 7 +- .../translators/ReadBoundedTranslator.java | 4 +- .../translators/ReadUnboundedTranslator.java | 4 +- .../translators/TransformTranslator.java | 2 +- .../translators/TranslationContext.java | 29 +- .../translators/WindowAssignTranslator.java | 100 +++++ .../translators/WindowBoundTranslator.java | 100 ----- .../translators/functions/DoFnFunction.java | 12 +- .../translators/io/UnboundedSourceWrapper.java | 1 + .../translators/utils/DoFnRunnerFactory.java | 4 +- .../utils/NoOpAggregatorFactory.java | 2 +- .../translators/utils/NoOpStepContext.java | 6 +- .../translators/utils/TranslatorUtils.java | 2 - .../translators/utils/TranslatorUtilsTest.java | 1 - 29 files changed, 703 insertions(+), 978 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index 3efb1f6..9a6a432 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-parent</artifactId> - <version>0.5.0-incubating-SNAPSHOT</version> + <version>0.7.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -65,10 +65,12 @@ <configuration> <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> <excludedGroups> + org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesMetrics + org.apache.beam.sdk.testing.UsesAttemptedMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> @@ -136,6 +138,16 @@ <artifactId>beam-runners-core-java</artifactId> </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-construction-java</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> @@ -182,6 +194,11 @@ </exclusions> </dependency> <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> @@ -210,8 +227,36 @@ <!-- Java compiler --> <plugin> - <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + <testSource>1.8</testSource> + <testTarget>1.8</testTarget> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <executions> + <execution> + <id>enforce</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <enforceBytecodeVersion> + <maxJdkVersion>1.8</maxJdkVersion> + </enforceBytecodeVersion> + <requireJavaVersion> + <version>[1.8,)</version> + </requireJavaVersion> + </rules> + </configuration> + </execution> + </executions> </plugin> <!-- uber jar --> http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java index 8f90898..d833cd6 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -43,6 +43,7 @@ public class GearpumpPipelineResult implements PipelineResult { private final ClientContext client; private final RunningApplication app; + private boolean finished = false; public GearpumpPipelineResult(ClientContext client, RunningApplication app) { this.client = client; @@ -51,13 +52,22 @@ public class GearpumpPipelineResult implements PipelineResult { @Override public State getState() { - return getGearpumpState(); + if (!finished) { + return getGearpumpState(); + } else { + return State.DONE; + } } @Override public State cancel() throws IOException { - app.shutDown(); - return State.CANCELLED; + if (!finished) { + app.shutDown(); + finished = true; + return State.CANCELLED; + } else { + return State.DONE; + } } @Override @@ -67,7 +77,10 @@ public class GearpumpPipelineResult implements PipelineResult { @Override public State waitUntilFinish() { - app.waitUntilFinish(); + if (!finished) { + app.waitUntilFinish(); + finished = true; + } return State.DONE; } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index 4cc060c..1a36343 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -18,13 +18,19 @@ package org.apache.beam.runners.gearpump; +import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator; import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator; -import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator; -import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator; +import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator; import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator; import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator; import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator; @@ -32,17 +38,29 @@ import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator; import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator; import org.apache.beam.runners.gearpump.translators.TransformTranslator; import org.apache.beam.runners.gearpump.translators.TranslationContext; -import org.apache.beam.runners.gearpump.translators.WindowBoundTranslator; +import org.apache.beam.runners.gearpump.translators.WindowAssignTranslator; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.gearpump.util.Graph; @@ -74,14 +92,13 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); - registerTransformTranslator(Flatten.FlattenPCollectionList.class, - new FlattenPCollectionTranslator()); + registerTransformTranslator(Flatten.PCollections.class, + new FlattenPCollectionsTranslator()); registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator()); - registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator()); - registerTransformTranslator(Create.Values.class, new CreateValuesTranslator()); + registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator()); registerTransformTranslator(View.CreatePCollectionView.class, new CreatePCollectionViewTranslator()); - registerTransformTranslator(GearpumpRunner.CreateGearpumpPCollectionView.class, + registerTransformTranslator(CreateGearpumpPCollectionView.class, new CreateGearpumpPCollectionViewTranslator<>()); } @@ -90,6 +107,27 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default } public void translate(Pipeline pipeline) { + Map<PTransformMatcher, PTransformOverrideFactory> overrides = + ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder() + .put(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new ReflectiveOneToOneOverrideFactory( + StreamingCombineGloballyAsSingletonView.class)) + .put(PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class)) + .put(PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class)) + .put(PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class)) + .put(PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class)) + .put(PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class)) + .build(); + + for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : + overrides.entrySet()) { + pipeline.replace(override.getKey(), override.getValue()); + } pipeline.traverseTopologically(this); } @@ -145,5 +183,337 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default return transformTranslators.get(transformClass); } + // The following codes are forked from DataflowRunner for View translator + private static class ReflectiveOneToOneOverrideFactory< + InputT extends PValue, + OutputT extends PValue, + TransformT extends PTransform<InputT, OutputT>> + extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> { + private final Class<PTransform<InputT, OutputT>> replacement; + + private ReflectiveOneToOneOverrideFactory( + Class<PTransform<InputT, OutputT>> replacement) { + this.replacement = replacement; + } + + @Override + public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform) { + return InstanceBuilder.ofType(replacement) + .withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), transform) + .build(); + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} + * for the Gearpump runner. + */ + private static class StreamingViewAsMap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + + private static final long serialVersionUID = 4791080760092950304L; + + public StreamingViewAsMap(View.AsMap<K, V> transform) {} + + @Override + public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, V>> view = + PCollectionViews.mapView( + input.getPipeline(), + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + // throw new RuntimeException(e); + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMap"; + } + } + + /** + * Specialized expansion for {@link + * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the + * Gearpump runner. + */ + private static class StreamingViewAsMultimap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + + private static final long serialVersionUID = 5854899081751333352L; + + public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {} + + @Override + public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, Iterable<V>>> view = + PCollectionViews.multimapView( + input.getPipeline(), + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + // throw new RuntimeException(e); + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMultimap"; + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the + * Gearpump runner. + */ + private static class StreamingViewAsIterable<T> + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { + + private static final long serialVersionUID = -3399860618995613421L; + + public StreamingViewAsIterable(View.AsIterable<T> transform) {} + + @Override + public PCollectionView<Iterable<T>> expand(PCollection<T> input) { + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView( + input.getPipeline(), + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsIterable"; + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the + * Gearpump runner. + */ + private static class StreamingViewAsList<T> + extends PTransform<PCollection<T>, PCollectionView<List<T>>> { + + private static final long serialVersionUID = -5018631473886330629L; + + public StreamingViewAsList(View.AsList<T> transform) {} + + @Override + public PCollectionView<List<T>> expand(PCollection<T> input) { + PCollectionView<List<T>> view = + PCollectionViews.listView( + input.getPipeline(), + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsList"; + } + } + private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> + extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { + + private static final long serialVersionUID = 9064900748869035738L; + private final Combine.GloballyAsSingletonView<InputT, OutputT> transform; + + public StreamingCombineGloballyAsSingletonView( + Combine.GloballyAsSingletonView<InputT, OutputT> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<OutputT> expand(PCollection<InputT> input) { + PCollection<OutputT> combined = + input.apply(Combine.globally(transform.getCombineFn()) + .withoutDefaults() + .withFanout(transform.getFanout())); + + PCollectionView<OutputT> view = PCollectionViews.singletonView( + combined.getPipeline(), + combined.getWindowingStrategy(), + transform.getInsertDefault(), + transform.getInsertDefault() + ? transform.getCombineFn().defaultValue() : null, + combined.getCoder()); + return combined + .apply(ParDo.of(new WrapAsList<OutputT>())) + .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingCombineGloballyAsSingletonView"; + } + } + + private static class StreamingViewAsSingleton<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { + + private static final long serialVersionUID = 5870455965625071546L; + private final View.AsSingleton<T> transform; + + public StreamingViewAsSingleton(View.AsSingleton<T> transform) { + this.transform = transform; + } + + @Override + public PCollectionView<T> expand(PCollection<T> input) { + Combine.Globally<T, T> combine = Combine.globally( + new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); + if (!transform.hasDefaultValue()) { + combine = combine.withoutDefaults(); + } + return input.apply(combine.asSingletonView()); + } + + @Override + protected String getKindString() { + return "StreamingViewAsSingleton"; + } + + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { + private boolean hasDefaultValue; + private T defaultValue; + + SingletonCombine(boolean hasDefaultValue, T defaultValue) { + this.hasDefaultValue = hasDefaultValue; + this.defaultValue = defaultValue; + } + + @Override + public T apply(T left, T right) { + throw new IllegalArgumentException("PCollection with more than one element " + + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + + "combine the PCollection into a single value"); + } + + @Override + public T identity() { + if (hasDefaultValue) { + return defaultValue; + } else { + throw new IllegalArgumentException( + "Empty PCollection accessed as a singleton view. " + + "Consider setting withDefault to provide a default value"); + } + } + } + } + + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(Collections.singletonList(c.element())); + } + } + /** + * Creates a primitive {@link PCollectionView}. + * + * <p>For internal use only by runner implementors. + * + * @param <ElemT> The type of the elements of the input PCollection + * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input + */ + public static class CreateGearpumpPCollectionView<ElemT, ViewT> + extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { + private static final long serialVersionUID = -2637073020800540542L; + private PCollectionView<ViewT> view; + + private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) { + this.view = view; + } + + public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of( + PCollectionView<ViewT> view) { + return new CreateGearpumpPCollectionView<>(view); + } + + public PCollectionView<ViewT> getView() { + return view; + } + + @Override + public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { + return view; + } + } + + /** + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * + * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, + * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. + * They require the input {@link PCollection} fits in memory. + * For a large {@link PCollection} this is expected to crash! + * + * @param <T> the type of elements to concatenate. + */ + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + @Override + public List<T> createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } + + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; + } + + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } + + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java index 72f2126..897467a 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java @@ -17,40 +17,18 @@ */ package org.apache.beam.runners.gearpump; -import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.beam.runners.gearpump.translators.TranslationContext; + import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.InstanceBuilder; -import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; import org.apache.gearpump.cluster.ClusterConfig; import org.apache.gearpump.cluster.UserConfig; @@ -72,21 +50,8 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers"; private static final String DEFAULT_APPNAME = "beam_gearpump_app"; - /** Custom transforms implementations. */ - private final Map<Class<?>, Class<?>> overrides; - public GearpumpRunner(GearpumpPipelineOptions options) { this.options = options; - - ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.builder(); - builder.put(Combine.GloballyAsSingletonView.class, - StreamingCombineGloballyAsSingletonView.class); - builder.put(View.AsMap.class, StreamingViewAsMap.class); - builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class); - builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class); - builder.put(View.AsList.class, StreamingViewAsList.class); - builder.put(View.AsIterable.class, StreamingViewAsIterable.class); - overrides = builder.build(); } public static GearpumpRunner fromOptions(PipelineOptions options) { @@ -95,31 +60,6 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { return new GearpumpRunner(pipelineOptions); } - - public <OutputT extends POutput, InputT extends PInput> OutputT apply( - PTransform<InputT, OutputT> transform, InputT input) { - if (overrides.containsKey(transform.getClass())) { - - Class<PTransform<InputT, OutputT>> transformClass = - (Class<PTransform<InputT, OutputT>>) transform.getClass(); - - Class<PTransform<InputT, OutputT>> customTransformClass = - (Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass()); - - PTransform<InputT, OutputT> customTransform = - InstanceBuilder.ofType(customTransformClass) - .withArg(transformClass, transform) - .build(); - - return Pipeline.applyTransform(input, customTransform); - } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) - && ((PCollectionList<?>) input).size() == 0) { - return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of()); - } else { - return super.apply(transform, input); - } - } - @Override public GearpumpPipelineResult run(Pipeline pipeline) { String appName = options.getApplicationName(); @@ -170,318 +110,4 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { - // The following codes are forked from DataflowRunner for View translator - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} - * for the Gearpump runner. - */ - private static class StreamingViewAsMap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { - - private static final long serialVersionUID = 4791080760092950304L; - - public StreamingViewAsMap(View.AsMap<K, V> transform) {} - - @Override - public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - // throw new RuntimeException(e); - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMap"; - } - } - - /** - * Specialized expansion for {@link - * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the - * Gearpump runner. - */ - private static class StreamingViewAsMultimap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { - - private static final long serialVersionUID = 5854899081751333352L; - - public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {} - - @Override - public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - // throw new RuntimeException(e); - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMultimap"; - } - } - - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the - * Gearpump runner. - */ - private static class StreamingViewAsIterable<T> - extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { - - private static final long serialVersionUID = -3399860618995613421L; - - public StreamingViewAsIterable(View.AsIterable<T> transform) {} - - @Override - public PCollectionView<Iterable<T>> expand(PCollection<T> input) { - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsIterable"; - } - } - - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the - * Gearpump runner. - */ - private static class StreamingViewAsList<T> - extends PTransform<PCollection<T>, PCollectionView<List<T>>> { - - private static final long serialVersionUID = -5018631473886330629L; - - public StreamingViewAsList(View.AsList<T> transform) {} - - @Override - public PCollectionView<List<T>> expand(PCollection<T> input) { - PCollectionView<List<T>> view = - PCollectionViews.listView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsList"; - } - } - private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> - extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { - - private static final long serialVersionUID = 9064900748869035738L; - private final Combine.GloballyAsSingletonView<InputT, OutputT> transform; - - public StreamingCombineGloballyAsSingletonView( - Combine.GloballyAsSingletonView<InputT, OutputT> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<OutputT> expand(PCollection<InputT> input) { - PCollection<OutputT> combined = - input.apply(Combine.globally(transform.getCombineFn()) - .withoutDefaults() - .withFanout(transform.getFanout())); - - PCollectionView<OutputT> view = PCollectionViews.singletonView( - combined.getPipeline(), - combined.getWindowingStrategy(), - transform.getInsertDefault(), - transform.getInsertDefault() - ? transform.getCombineFn().defaultValue() : null, - combined.getCoder()); - return combined - .apply(ParDo.of(new WrapAsList<OutputT>())) - .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingCombineGloballyAsSingletonView"; - } - } - - private static class StreamingViewAsSingleton<T> - extends PTransform<PCollection<T>, PCollectionView<T>> { - - private static final long serialVersionUID = 5870455965625071546L; - private final View.AsSingleton<T> transform; - - public StreamingViewAsSingleton(View.AsSingleton<T> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<T> expand(PCollection<T> input) { - Combine.Globally<T, T> combine = Combine.globally( - new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); - if (!transform.hasDefaultValue()) { - combine = combine.withoutDefaults(); - } - return input.apply(combine.asSingletonView()); - } - - @Override - protected String getKindString() { - return "StreamingViewAsSingleton"; - } - - private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { - private boolean hasDefaultValue; - private T defaultValue; - - SingletonCombine(boolean hasDefaultValue, T defaultValue) { - this.hasDefaultValue = hasDefaultValue; - this.defaultValue = defaultValue; - } - - @Override - public T apply(T left, T right) { - throw new IllegalArgumentException("PCollection with more than one element " - + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " - + "combine the PCollection into a single value"); - } - - @Override - public T identity() { - if (hasDefaultValue) { - return defaultValue; - } else { - throw new IllegalArgumentException( - "Empty PCollection accessed as a singleton view. " - + "Consider setting withDefault to provide a default value"); - } - } - } - } - - private static class WrapAsList<T> extends DoFn<T, List<T>> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(Collections.singletonList(c.element())); - } - } - - /** - * Creates a primitive {@link PCollectionView}. - * - * <p>For internal use only by runner implementors. - * - * @param <ElemT> The type of the elements of the input PCollection - * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input - */ - public static class CreateGearpumpPCollectionView<ElemT, ViewT> - extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { - private static final long serialVersionUID = -2637073020800540542L; - private PCollectionView<ViewT> view; - - private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) { - this.view = view; - } - - public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of( - PCollectionView<ViewT> view) { - return new CreateGearpumpPCollectionView<>(view); - } - - public PCollectionView<ViewT> getView() { - return view; - } - - @Override - public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { - return view; - } - } - - /** - * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * - * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, - * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. - * They require the input {@link PCollection} fits in memory. - * For a large {@link PCollection} this is expected to crash! - * - * @param <T> the type of elements to concatenate. - */ - private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { - @Override - public List<T> createAccumulator() { - return new ArrayList<>(); - } - - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } - - @Override - public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { - List<T> result = createAccumulator(); - for (List<T> accumulator : accumulators) { - result.addAll(accumulator); - } - return result; - } - - @Override - public List<T> extractOutput(List<T> accumulator) { - return accumulator; - } - - @Override - public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - - @Override - public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java index c96bcb1..ea7dd26 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java @@ -24,9 +24,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; import org.apache.gearpump.cluster.ClusterConfig; import org.apache.gearpump.cluster.embedded.EmbeddedCluster; @@ -58,36 +55,9 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { @Override public GearpumpPipelineResult run(Pipeline pipeline) { - try { - GearpumpPipelineResult result = delegate.run(pipeline); - result.waitUntilFinish(); - cluster.stop(); - return result; - } catch (Throwable e) { - // copied from TestFlinkRunner to pull out AssertionError - // which is wrapped in UserCodeException - Throwable cause = e; - Throwable oldCause; - do { - if (cause.getCause() == null) { - break; - } - - oldCause = cause; - cause = cause.getCause(); - - } while (!oldCause.equals(cause)); - if (cause instanceof AssertionError) { - throw (AssertionError) cause; - } else { - throw e; - } - } - } - - @Override - public <OutputT extends POutput, InputT extends PInput> - OutputT apply(PTransform<InputT, OutputT> transform, InputT input) { - return delegate.apply(transform, input); + GearpumpPipelineResult result = delegate.run(pipeline); + result.waitUntilFinish(); + cluster.stop(); + return result; } } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java deleted file mode 100644 index b2d762a..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.examples; - -import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.runners.gearpump.GearpumpRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.apache.gearpump.cluster.client.ClientContext; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * streaming word count example on Gearpump runner. - */ -public class StreamingWordCount { - - static class ExtractWordsFn extends DoFn<String, String> { - - @ProcessElement - public void process(ProcessContext c) { - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { - private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class); - - @ProcessElement - public void process(ProcessContext c) { - String row = c.element().getKey() - + " - " + c.element().getValue() - + " @ " + c.timestamp().toString(); - LOG.debug("output {}", row); - c.output(row); - } - } - - - public static void main(String[] args) { - GearpumpPipelineOptions options = PipelineOptionsFactory - .fromArgs(args).as(GearpumpPipelineOptions.class); - options.setRunner(GearpumpRunner.class); - options.setApplicationName("StreamingWordCount"); - options.setParallelism(1); - - Pipeline p = Pipeline.create(options); - - PCollection<KV<String, Long>> wordCounts = - p.apply(Read.from(new UnboundedTextSource())) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))) - .apply(Count.<String>perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())); - - p.run(); - - ClientContext clientContext = options.getClientContext(); - clientContext.close(); - - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java deleted file mode 100644 index b014432..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.examples; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; - -import org.joda.time.Instant; - - -/** - * unbounded source that reads from text. - */ -public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> { - - @Override - public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this); - } - - @Override - public UnboundedReader<String> createReader(PipelineOptions options, - @Nullable CheckpointMark checkpointMark) { - return new UnboundedTextReader(this); - } - - @Nullable - @Override - public Coder<CheckpointMark> getCheckpointMarkCoder() { - return null; - } - - @Override - public void validate() { - } - - @Override - public Coder<String> getDefaultOutputCoder() { - return StringUtf8Coder.of(); - } - - /** - * reads from text. - */ - public static class UnboundedTextReader extends UnboundedReader<String> implements Serializable { - - private static final long serialVersionUID = 7526472295622776147L; - - private final UnboundedTextSource source; - - private final String[] texts = new String[]{"foo foo foo bar bar", "foo foo bar bar bar"}; - private long index = 0; - - private String currentRecord; - - private Instant currentTimestamp; - - public UnboundedTextReader(UnboundedTextSource source) { - this.source = source; - } - - @Override - public boolean start() throws IOException { - currentRecord = texts[0]; - currentTimestamp = new Instant(0); - return true; - } - - @Override - public boolean advance() throws IOException { - index++; - currentRecord = texts[(int) index % (texts.length)]; - currentTimestamp = new Instant(index * 1000); - - return true; - } - - @Override - public byte[] getCurrentRecordId() throws NoSuchElementException { - return new byte[0]; - } - - @Override - public String getCurrent() throws NoSuchElementException { - return this.currentRecord; - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return currentTimestamp; - } - - @Override - public void close() throws IOException { - } - - @Override - public Instant getWatermark() { - return currentTimestamp; - } - - @Override - public CheckpointMark getCheckpointMark() { - return null; - } - - @Override - public UnboundedSource<String, ?> getCurrentSource() { - return this.source; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java deleted file mode 100644 index a62a6c0..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Examples showcase Beam application over Gearpump runner. - */ -package org.apache.beam.runners.gearpump.examples; http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java index d05c89d..c7f24a8 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java @@ -20,25 +20,27 @@ package org.apache.beam.runners.gearpump.translators; import java.util.List; -import org.apache.beam.runners.gearpump.GearpumpRunner; +import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; - /** * CreateGearpumpPCollectionView bridges input stream to down stream * transforms. */ public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements - TransformTranslator<GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT>> { + TransformTranslator<GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT>> { + + private static final long serialVersionUID = -3955521308055056034L; @Override - public void translate(GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT> transform, + public void translate( + GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT> transform, TranslationContext context) { JavaStream<WindowedValue<List<ElemT>>> inputStream = - context.getInputStream(context.getInput(transform)); - PCollectionView<ViewT> view = transform.getView(); + context.getInputStream(context.getInput()); + PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput(); context.setOutputStream(view, inputStream); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java index e9e2e5d..da55d70 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java @@ -32,12 +32,14 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; public class CreatePCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> { + private static final long serialVersionUID = -2394386873317515748L; + @Override public void translate(View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) { JavaStream<WindowedValue<List<ElemT>>> inputStream = - context.getInputStream(context.getInput(transform)); - PCollectionView<ViewT> view = transform.getView(); + context.getInputStream(context.getInput()); + PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput(); context.setOutputStream(view, inputStream); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java deleted file mode 100644 index e5dc6dd..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.translators; - -import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; -import org.apache.beam.runners.gearpump.translators.io.ValuesSource; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.WindowedValue; - -import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; - -/** - * Wraps elements from Create.Values into an {@link UnboundedSource}. - * mainly used for test - */ -public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> { - - private static final long serialVersionUID = 5411841848199229738L; - - @Override - public void translate(Create.Values<T> transform, TranslationContext context) { - try { - UnboundedSourceWrapper<T, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>( - new ValuesSource<>(transform.getElements(), - transform.getDefaultOutputCoder(context.getInput(transform))), - context.getPipelineOptions()); - JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper); - context.setOutputStream(context.getOutput(transform), sourceStream); - } catch (CannotProvideCoderException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java deleted file mode 100644 index 27e54b8..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.translators; - -import com.google.common.collect.Lists; - -import java.util.HashSet; -import java.util.Set; - -import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; -import org.apache.beam.runners.gearpump.translators.io.ValuesSource; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.values.PCollection; - -import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; -import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; - - - -/** - * Flatten.FlattenPCollectionList is translated to Gearpump merge function. - * Note only two-way merge is working now - */ -public class FlattenPCollectionTranslator<T> implements - TransformTranslator<Flatten.FlattenPCollectionList<T>> { - - private static final long serialVersionUID = -5552148802472944759L; - - @Override - public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { - JavaStream<T> merged = null; - Set<PCollection<T>> unique = new HashSet<>(); - for (PCollection<T> collection : context.getInput(transform).getAll()) { - unique.add(collection); - JavaStream<T> inputStream = context.getInputStream(collection); - if (null == merged) { - merged = inputStream; - } else { - // duplicate edges are not allowed in Gearpump graph - // so we route through a dummy node - if (unique.contains(collection)) { - inputStream = inputStream.map(new DummyFunction<T>(), "dummy"); - } - - merged = merged.merge(inputStream, transform.getName()); - } - } - - if (null == merged) { - UnboundedSourceWrapper<String, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>( - new ValuesSource<>(Lists.newArrayList("dummy"), - StringUtf8Coder.of()), context.getPipelineOptions()); - merged = context.getSourceStream(unboundedSourceWrapper); - } - context.setOutputStream(context.getOutput(transform), merged); - } - - private static class DummyFunction<T> extends MapFunction<T, T> { - - private static final long serialVersionUID = 5454396869997290471L; - - @Override - public T map(T t) { - return t; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java new file mode 100644 index 0000000..3a465cb --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import com.google.common.collect.Lists; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; +import org.apache.beam.runners.gearpump.translators.io.ValuesSource; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; + +import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; + +/** + * Flatten.FlattenPCollectionList is translated to Gearpump merge function. + */ +public class FlattenPCollectionsTranslator<T> implements + TransformTranslator<Flatten.PCollections<T>> { + + private static final long serialVersionUID = -5552148802472944759L; + + @Override + public void translate(Flatten.PCollections<T> transform, TranslationContext context) { + JavaStream<T> merged = null; + Set<PCollection<T>> unique = new HashSet<>(); + for (TaggedPValue input: context.getInputs()) { + PCollection<T> collection = (PCollection<T>) input.getValue(); + unique.add(collection); + JavaStream<T> inputStream = context.getInputStream(collection); + if (null == merged) { + merged = inputStream; + } else { + // duplicate edges are not allowed in Gearpump graph + // so we route through a dummy node + if (unique.contains(collection)) { + inputStream = inputStream.map(new DummyFunction<T>(), "dummy"); + } + + merged = merged.merge(inputStream, transform.getName()); + } + } + + if (null == merged) { + UnboundedSourceWrapper<String, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>( + new ValuesSource<>(Lists.newArrayList("dummy"), + StringUtf8Coder.of()), context.getPipelineOptions()); + merged = context.getSourceStream(unboundedSourceWrapper); + } + context.setOutputStream(context.getOutput(), merged); + } + + private static class DummyFunction<T> extends MapFunction<T, T> { + + private static final long serialVersionUID = 5454396869997290471L; + + @Override + public T map(T t) { + return t; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index df8bfe9..5dfd3e9 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -61,7 +61,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe @Override public void translate(GroupByKey<K, V> transform, TranslationContext context) { - PCollection<KV<K, V>> input = context.getInput(transform); + PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput(); Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder(); JavaStream<WindowedValue<KV<K, V>>> inputStream = context.getInputStream(input); @@ -80,7 +80,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe .fold(new Merge<>(windowFn, outputTimeFn), "merge") .map(new Values<K, V>(), "values"); - context.setOutputStream(context.getOutput(transform), outputStream); + context.setOutputStream(context.getOutput(), outputStream); } private static class GearpumpWindowFn<T, W extends BoundedWindow> http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index 8c57019..e88cb73 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -18,17 +18,11 @@ package org.apache.beam.runners.gearpump.translators; - -import com.google.common.base.Predicate; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; - import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction; import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.transforms.DoFn; @@ -36,6 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction; @@ -54,21 +49,21 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements @Override public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { - PCollection<InputT> inputT = (PCollection<InputT>) context.getInput(transform); + PCollection<InputT> inputT = (PCollection<InputT>) context.getInput(); JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(inputT); Collection<PCollectionView<?>> sideInputs = transform.getSideInputs(); Map<String, PCollectionView<?>> tagsToSideInputs = TranslatorUtils.getTagsToSideInputs(sideInputs); - Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); + List<TaggedPValue> outputs = context.getOutputs(); final TupleTag<OutputT> mainOutput = transform.getMainOutputTag(); - List<TupleTag<?>> sideOutputs = Lists.newLinkedList(Sets.filter(outputs.keySet(), - new Predicate<TupleTag<?>>() { - @Override - public boolean apply(@Nullable TupleTag<?> tupleTag) { - return tupleTag != null && !tupleTag.getId().equals(mainOutput.getId()); - } - })); + List<TupleTag<?>> sideOutputs = new ArrayList<>(outputs.size() - 1); + for (TaggedPValue output: outputs) { + TupleTag<?> tag = output.getTag(); + if (tag != null && !tag.getId().equals(mainOutput.getId())) { + sideOutputs.add(tag); + } + } JavaStream<TranslatorUtils.RawUnionValue> unionStream = TranslatorUtils.withSideInputStream( context, inputStream, tagsToSideInputs); @@ -83,10 +78,9 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements tagsToSideInputs, mainOutput, sideOutputs), transform.getName()); - for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) { - output.getValue().getCoder(); + for (TaggedPValue output: outputs) { JavaStream<WindowedValue<OutputT>> taggedStream = outputStream - .filter(new FilterByOutputTag(output.getKey().getId()), + .filter(new FilterByOutputTag(output.getTag().getId()), "filter_by_output_tag") .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue"); context.setOutputStream(output.getValue(), taggedStream); http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java index efae938..dc32b8c 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; - /** * {@link ParDo.Bound} is translated to Gearpump flatMap function * with {@link DoFn} wrapped in {@link DoFnFunction}. @@ -50,14 +49,14 @@ public class ParDoBoundTranslator<InputT, OutputT> implements @Override public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getFn(); - PCollection<OutputT> output = context.getOutput(transform); + PCollection<OutputT> output = (PCollection<OutputT>) context.getOutput(); WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy(); Collection<PCollectionView<?>> sideInputs = transform.getSideInputs(); Map<String, PCollectionView<?>> tagsToSideInputs = TranslatorUtils.getTagsToSideInputs(sideInputs); JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream( - context.getInput(transform)); + context.getInput()); JavaStream<TranslatorUtils.RawUnionValue> unionStream = TranslatorUtils.withSideInputStream(context, inputStream, tagsToSideInputs); @@ -71,6 +70,6 @@ public class ParDoBoundTranslator<InputT, OutputT> implements .flatMap(doFnFunction, transform.getName()) .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue"); - context.setOutputStream(context.getOutput(transform), outputStream); + context.setOutputStream(context.getOutput(), outputStream); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java index 478d58f..8f71a8e 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java @@ -31,6 +31,8 @@ import org.apache.gearpump.streaming.source.DataSource; */ public class ReadBoundedTranslator <T> implements TransformTranslator<Read.Bounded<T>> { + private static final long serialVersionUID = -3899020490896998330L; + @Override public void translate(Read.Bounded<T> transform, TranslationContext context) { BoundedSource<T> boundedSource = transform.getSource(); @@ -38,7 +40,7 @@ public class ReadBoundedTranslator <T> implements TransformTranslator<Read.Bound context.getPipelineOptions()); JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(sourceWrapper); - context.setOutputStream(context.getOutput(transform), sourceStream); + context.setOutputStream(context.getOutput(), sourceStream); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java index 7e12a9c..0462c57 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java @@ -33,6 +33,8 @@ import org.apache.gearpump.streaming.source.DataSource; public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> { + private static final long serialVersionUID = 3529494817859948619L; + @Override public void translate(Read.Unbounded<T> transform, TranslationContext context) { UnboundedSource<T, ?> unboundedSource = transform.getSource(); @@ -40,7 +42,7 @@ public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbo unboundedSource, context.getPipelineOptions()); JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper); - context.setOutputStream(context.getOutput(transform), sourceStream); + context.setOutputStream(context.getOutput(), sourceStream); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java index c8587d3..c7becad 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java @@ -23,7 +23,7 @@ import java.io.Serializable; import org.apache.beam.sdk.transforms.PTransform; /** - * translates {@link PTransform} to Gearpump functions. + * Translates {@link PTransform} to Gearpump functions. */ public interface TransformTranslator<T extends PTransform> extends Serializable { void translate(T transform, TranslationContext context); http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java index b2cff8a..e88bb74 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -20,17 +20,18 @@ package org.apache.beam.runners.gearpump.translators; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.Iterables; + import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; @@ -70,18 +71,26 @@ public class TranslationContext { } } - public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) { - return (InputT) getCurrentTransform(transform).getInput(); + public List<TaggedPValue> getInputs() { + return getCurrentTransform().getInputs(); + } + + public PValue getInput() { + return Iterables.getOnlyElement(getInputs()).getValue(); + } + + public List<TaggedPValue> getOutputs() { + return getCurrentTransform().getOutputs(); } - public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) { - return (OutputT) getCurrentTransform(transform).getOutput(); + public PValue getOutput() { + return Iterables.getOnlyElement(getOutputs()).getValue(); } - private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) { + private AppliedPTransform<?, ?, ?> getCurrentTransform() { checkArgument( - currentTransform != null && currentTransform.getTransform() == transform, - "can only be called with current transform"); + currentTransform != null, + "current transform not set"); return currentTransform; } http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java new file mode 100644 index 0000000..fe6015a --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import com.google.common.collect.Iterables; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; +import org.joda.time.Instant; + +/** + * {@link Window.Bound} is translated to Gearpump flatMap function. + */ +@SuppressWarnings("unchecked") +public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> { + + private static final long serialVersionUID = -964887482120489061L; + + @Override + public void translate(Window.Assign<T> transform, TranslationContext context) { + PCollection<T> input = (PCollection<T>) context.getInput(); + PCollection<T> output = (PCollection<T>) context.getOutput(); + JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input); + WindowingStrategy<?, ?> outputStrategy = output.getWindowingStrategy(); + WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); + JavaStream<WindowedValue<T>> outputStream = + inputStream + .flatMap(new AssignWindows(windowFn), "assign_windows"); + + context.setOutputStream(output, outputStream); + } + + private static class AssignWindows<T> extends + FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { + + private static final long serialVersionUID = 7284565861938681360L; + private final WindowFn<T, BoundedWindow> windowFn; + + AssignWindows(WindowFn<T, BoundedWindow> windowFn) { + this.windowFn = windowFn; + } + + @Override + public Iterator<WindowedValue<T>> flatMap(final WindowedValue<T> value) { + try { + Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() { + @Override + public T element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); + } + }); + List<WindowedValue<T>> values = new ArrayList<>(windows.size()); + for (BoundedWindow win: windows) { + values.add( + WindowedValue.of(value.getValue(), value.getTimestamp(), win, value.getPane())); + } + return values.iterator(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +}