Update gearpump-runner against master changes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44d21ac6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44d21ac6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44d21ac6 Branch: refs/heads/master Commit: 44d21ac662e263c09caf2dd3b93b1c325bdfea15 Parents: 46c41fc Author: manuzhang <owenzhang1...@gmail.com> Authored: Thu Apr 20 20:59:47 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Fri Apr 21 23:04:18 2017 +0800 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 11 +-- .../gearpump/GearpumpPipelineTranslator.java | 92 +++++++++++--------- .../FlattenPCollectionsTranslator.java | 6 +- .../translators/ParDoMultiOutputTranslator.java | 11 ++- .../translators/TranslationContext.java | 11 ++- .../translators/WindowAssignTranslator.java | 2 +- .../translators/functions/DoFnFunction.java | 2 +- .../gearpump/translators/io/GearpumpSource.java | 3 +- .../gearpump/translators/io/ValuesSource.java | 3 +- .../translators/utils/DoFnRunnerFactory.java | 3 +- .../translators/utils/NoOpStepContext.java | 3 +- .../FlattenPCollectionsTranslatorTest.java | 48 ++++++---- .../translators/GroupByKeyTranslatorTest.java | 3 +- sdks/java/pom.xml | 2 +- 14 files changed, 113 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index a691801..dcfa390 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -43,13 +43,13 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <gearpump.version>0.8.3-SNAPSHOT</gearpump.version> + <gearpump.version>0.8.3</gearpump.version> </properties> <profiles> <profile> - <id>local-runnable-on-service-tests</id> - <activation><activeByDefault>true</activeByDefault></activation> + <id>local-validates-runner-tests</id> + <activation><activeByDefault>false</activeByDefault></activation> <build> <plugins> <plugin> @@ -63,14 +63,15 @@ <goal>test</goal> </goals> <configuration> - <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> + <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups> <excludedGroups> org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesAttemptedMetrics, - org.apache.beam.sdk.testing.UsesCommittedMetrics + org.apache.beam.sdk.testing.UsesCommittedMetrics, + org.apache.beam.sdk.testing.UsesTestStream </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index 1a36343..f5f5e70 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.gearpump; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Collections; @@ -27,13 +27,14 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator; import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator; import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator; import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator; -import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator; -import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator; +import org.apache.beam.runners.gearpump.translators.ParDoMultiOutputTranslator; +import org.apache.beam.runners.gearpump.translators.ParDoSingleOutputTranslator; import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator; import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator; import org.apache.beam.runners.gearpump.translators.TransformTranslator; @@ -45,9 +46,9 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.PTransformMatcher; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -72,7 +73,7 @@ import org.slf4j.LoggerFactory; * into Gearpump {@link Graph}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { +public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { private static final Logger LOG = LoggerFactory.getLogger( GearpumpPipelineTranslator.class); @@ -88,13 +89,13 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default static { // register TransformTranslators - registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator()); + registerTransformTranslator(ParDo.SingleOutput.class, new ParDoSingleOutputTranslator()); registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); registerTransformTranslator(Flatten.PCollections.class, new FlattenPCollectionsTranslator()); - registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator()); + registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator()); registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator()); registerTransformTranslator(View.CreatePCollectionView.class, new CreatePCollectionViewTranslator()); @@ -107,27 +108,30 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default } public void translate(Pipeline pipeline) { - Map<PTransformMatcher, PTransformOverrideFactory> overrides = - ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder() - .put(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + List<PTransformOverride> overrides = + ImmutableList.<PTransformOverride>builder() + .add(PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class))) + .add(PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class))) + .add(PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class))) + .add(PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class))) + .add(PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class))) + .add(PTransformOverride.of( + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), new ReflectiveOneToOneOverrideFactory( - StreamingCombineGloballyAsSingletonView.class)) - .put(PTransformMatchers.classEqualTo(View.AsMap.class), - new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class)) - .put(PTransformMatchers.classEqualTo(View.AsMultimap.class), - new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class)) - .put(PTransformMatchers.classEqualTo(View.AsSingleton.class), - new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class)) - .put(PTransformMatchers.classEqualTo(View.AsList.class), - new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class)) - .put(PTransformMatchers.classEqualTo(View.AsIterable.class), - new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class)) + StreamingCombineGloballyAsSingletonView.class))) .build(); - for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : - overrides.entrySet()) { - pipeline.replace(override.getKey(), override.getValue()); - } + pipeline.replaceAll(overrides); pipeline.traverseTopologically(this); } @@ -185,22 +189,27 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default // The following codes are forked from DataflowRunner for View translator private static class ReflectiveOneToOneOverrideFactory< - InputT extends PValue, - OutputT extends PValue, - TransformT extends PTransform<InputT, OutputT>> - extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> { - private final Class<PTransform<InputT, OutputT>> replacement; + InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>> + extends SingleInputOutputOverrideFactory< + PCollection<InputT>, PCollection<OutputT>, TransformT> { + private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement; private ReflectiveOneToOneOverrideFactory( - Class<PTransform<InputT, OutputT>> replacement) { + Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement) { this.replacement = replacement; } @Override - public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform) { - return InstanceBuilder.ofType(replacement) - .withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), transform) - .build(); + public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform( + AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) { + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(transform), + InstanceBuilder.ofType(replacement) + .withArg( + (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>) + transform.getTransform().getClass(), + transform.getTransform()) + .build()); } } @@ -220,7 +229,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { PCollectionView<Map<K, V>> view = PCollectionViews.mapView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()); @@ -259,7 +268,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()); @@ -298,7 +307,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default public PCollectionView<Iterable<T>> expand(PCollection<T> input) { PCollectionView<Iterable<T>> view = PCollectionViews.iterableView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()); @@ -328,7 +337,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default public PCollectionView<List<T>> expand(PCollection<T> input) { PCollectionView<List<T>> view = PCollectionViews.listView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()); @@ -341,6 +350,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default return "StreamingViewAsList"; } } + private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { @@ -360,7 +370,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default .withFanout(transform.getFanout())); PCollectionView<OutputT> view = PCollectionViews.singletonView( - combined.getPipeline(), + combined, combined.getWindowingStrategy(), transform.getInsertDefault(), transform.getInsertDefault() http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java index 56f7d1a..5ca05d8 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java @@ -29,7 +29,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; @@ -45,8 +45,8 @@ public class FlattenPCollectionsTranslator<T> implements public void translate(Flatten.PCollections<T> transform, TranslationContext context) { JavaStream<T> merged = null; Set<PCollection<T>> unique = new HashSet<>(); - for (TaggedPValue input: context.getInputs()) { - PCollection<T> collection = (PCollection<T>) input.getValue(); + for (PValue input: context.getInputs().values()) { + PCollection<T> collection = (PCollection<T>) input; JavaStream<T> inputStream = context.getInputStream(collection); if (null == merged) { merged = inputStream; http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java index e78568d..d92979b 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction; @@ -55,11 +55,10 @@ public class ParDoMultiOutputTranslator<InputT, OutputT> implements Map<String, PCollectionView<?>> tagsToSideInputs = TranslatorUtils.getTagsToSideInputs(sideInputs); - List<TaggedPValue> outputs = context.getOutputs(); + Map<TupleTag<?>, PValue> outputs = context.getOutputs(); final TupleTag<OutputT> mainOutput = transform.getMainOutputTag(); List<TupleTag<?>> sideOutputs = new ArrayList<>(outputs.size() - 1); - for (TaggedPValue output: outputs) { - TupleTag<?> tag = output.getTag(); + for (TupleTag<?> tag: outputs.keySet()) { if (tag != null && !tag.getId().equals(mainOutput.getId())) { sideOutputs.add(tag); } @@ -78,9 +77,9 @@ public class ParDoMultiOutputTranslator<InputT, OutputT> implements tagsToSideInputs, mainOutput, sideOutputs), transform.getName()); - for (TaggedPValue output: outputs) { + for (Map.Entry<TupleTag<?>, PValue> output: outputs.entrySet()) { JavaStream<WindowedValue<OutputT>> taggedStream = outputStream - .filter(new FilterByOutputTag(output.getTag().getId()), + .filter(new FilterByOutputTag(output.getKey().getId()), "filter_by_output_tag") .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue"); context.setOutputStream(output.getValue(), taggedStream); http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java index e88bb74..eb6bc18 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.Iterables; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; @@ -31,7 +30,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; @@ -71,20 +70,20 @@ public class TranslationContext { } } - public List<TaggedPValue> getInputs() { + public Map<TupleTag<?>, PValue> getInputs() { return getCurrentTransform().getInputs(); } public PValue getInput() { - return Iterables.getOnlyElement(getInputs()).getValue(); + return Iterables.getOnlyElement(getInputs().values()); } - public List<TaggedPValue> getOutputs() { + public Map<TupleTag<?>, PValue> getOutputs() { return getCurrentTransform().getOutputs(); } public PValue getOutput() { - return Iterables.getOnlyElement(getOutputs()).getValue(); + return Iterables.getOnlyElement(getOutputs().values()); } private AppliedPTransform<?, ?, ?> getCurrentTransform() { http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java index 2d70b63..149f80c 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java @@ -36,7 +36,7 @@ import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; import org.joda.time.Instant; /** - * {@link Window.Bound} is translated to Gearpump flatMap function. + * {@link Window.Assign} is translated to Gearpump flatMap function. */ @SuppressWarnings("unchecked") public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index 9941e71..3473f53 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -156,7 +156,7 @@ public class DoFnFunction<InputT, OutputT> extends for (WindowedValue<InputT> value : pushedBackValues) { for (BoundedWindow win: value.getWindows()) { BoundedWindow sideInputWindow = - sideInput.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(win); + sideInput.getWindowMappingFn().getSideInputWindow(win); if (!sideInputReader.isReady(sideInput, sideInputWindow)) { Object emptyValue = WindowedValue.of( Lists.newArrayList(), value.getTimestamp(), sideInputWindow, value.getPane()); http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index c079603..5e79151 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -46,6 +46,7 @@ public abstract class GearpumpSource<T> implements DataSource { private Source.Reader<T> reader; private boolean available = false; + private long count = 0L; GearpumpSource(PipelineOptions options) { try { @@ -112,7 +113,7 @@ public abstract class GearpumpSource<T> implements DataSource { } } else { if (available) { - return TranslatorUtils.jodaTimeToJava8Time(reader.getCurrentTimestamp()); + return Watermark.MIN(); } else { return Watermark.MAX(); } http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java index ccd5cdf..b62da19 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; @@ -68,7 +69,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi } @Override - public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits( + public java.util.List<? extends UnboundedSource<T, CheckpointMark>> split( int desiredNumSplits, PipelineOptions options) throws Exception { return Collections.singletonList(this); } http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java index bdfc336..70b4271 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -28,6 +28,7 @@ import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SimpleDoFnRunner; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -78,7 +79,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner( options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, aggregatorFactory, windowingStrategy); - return PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); + return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); } } http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java index 140df2a..4e0a74c 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java @@ -49,7 +49,8 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab } @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { + public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) { + } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java index fa89d4a..ac12fa4 100644 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java @@ -26,13 +26,15 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.Lists; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.source.DataSource; @@ -55,10 +57,10 @@ public class FlattenPCollectionsTranslatorTest { @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void testTranslateWithEmptyCollection() { - PValue mockOutput = mock(PValue.class); + PCollection mockOutput = mock(PCollection.class); TranslationContext translationContext = mock(TranslationContext.class); - when(translationContext.getInputs()).thenReturn(Collections.EMPTY_LIST); + when(translationContext.getInputs()).thenReturn(Collections.EMPTY_MAP); when(translationContext.getOutput()).thenReturn(mockOutput); translator.translate(transform, translationContext); @@ -71,11 +73,12 @@ public class FlattenPCollectionsTranslatorTest { JavaStream javaStream = mock(JavaStream.class); TranslationContext translationContext = mock(TranslationContext.class); - TaggedPValue mockInput = mock(TaggedPValue.class); + Map<TupleTag<?>, PValue> inputs = new HashMap<>(); + TupleTag tag = mock(TupleTag.class); PCollection mockCollection = mock(PCollection.class); - when(mockInput.getValue()).thenReturn(mockCollection); + inputs.put(tag, mockCollection); - when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput)); + when(translationContext.getInputs()).thenReturn(inputs); when(translationContext.getInputStream(mockCollection)).thenReturn(javaStream); PValue mockOutput = mock(PValue.class); @@ -93,22 +96,30 @@ public class FlattenPCollectionsTranslatorTest { JavaStream javaStream1 = mock(JavaStream.class); JavaStream javaStream2 = mock(JavaStream.class); + JavaStream mergedStream = mock(JavaStream.class); TranslationContext translationContext = mock(TranslationContext.class); - TaggedPValue mockInput1 = mock(TaggedPValue.class); + Map<TupleTag<?>, PValue> inputs = new HashMap<>(); + TupleTag tag1 = mock(TupleTag.class); PCollection mockCollection1 = mock(PCollection.class); - when(mockInput1.getValue()).thenReturn(mockCollection1); + inputs.put(tag1, mockCollection1); - TaggedPValue mockInput2 = mock(TaggedPValue.class); + TupleTag tag2 = mock(TupleTag.class); PCollection mockCollection2 = mock(PCollection.class); - when(mockInput2.getValue()).thenReturn(mockCollection2); + inputs.put(tag2, mockCollection2); - when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2)); + PCollection output = mock(PCollection.class); + + when(translationContext.getInputs()).thenReturn(inputs); when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1); when(translationContext.getInputStream(mockCollection2)).thenReturn(javaStream2); + when(javaStream1.merge(javaStream2, transformName)).thenReturn(mergedStream); + when(javaStream2.merge(javaStream1, transformName)).thenReturn(mergedStream); + + when(translationContext.getOutput()).thenReturn(output); translator.translate(transform, translationContext); - verify(javaStream1).merge(javaStream2, transformName); + verify(translationContext).setOutputStream(output, mergedStream); } @Test @@ -120,14 +131,15 @@ public class FlattenPCollectionsTranslatorTest { JavaStream javaStream1 = mock(JavaStream.class); TranslationContext translationContext = mock(TranslationContext.class); + Map<TupleTag<?>, PValue> inputs = new HashMap<>(); + TupleTag tag1 = mock(TupleTag.class); PCollection mockCollection1 = mock(PCollection.class); - TaggedPValue mockInput1 = mock(TaggedPValue.class); - when(mockInput1.getValue()).thenReturn(mockCollection1); + inputs.put(tag1, mockCollection1); - TaggedPValue mockInput2 = mock(TaggedPValue.class); - when(mockInput2.getValue()).thenReturn(mockCollection1); + TupleTag tag2 = mock(TupleTag.class); + inputs.put(tag2, mockCollection1); - when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2)); + when(translationContext.getInputs()).thenReturn(inputs); when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1); translator.translate(transform, translationContext); http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java index 9135022..4e66ba9 100644 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java @@ -133,7 +133,8 @@ public class GroupByKeyTranslatorTest { PaneInfo.NO_FIRING); KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result1 = - merge.fold(KV.of(null, null), KV.of(key1, value1)); + merge.fold(KV.<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>>of( + null, null), KV.of(key1, value1)); assertThat(result1.getKey(), equalTo(key1)); assertThat(result1.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1"))); http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 7ca6109..21b5841 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -38,7 +38,7 @@ <module>build-tools</module> --> <module>core</module> <module>io</module> - <module>maven-archetypes</module> + <!--<module>maven-archetypes</module>--> <module>extensions</module> <!-- javadoc runs directly from the root parent as the last module in the build to be able to capture runner-specific javadoc.