Update gearpump-runner against master changes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12b9719e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12b9719e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12b9719e Branch: refs/heads/master Commit: 12b9719e992d6fbac57efb4dc8ce7eff5e977862 Parents: 9a59ea3 Author: manuzhang <owenzhang1...@gmail.com> Authored: Thu May 4 12:14:00 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Thu May 4 12:14:00 2017 +0800 ---------------------------------------------------------------------- .../gearpump/GearpumpPipelineResult.java | 11 ---- .../translators/GroupByKeyTranslator.java | 28 ++++----- .../translators/functions/DoFnFunction.java | 2 - .../translators/utils/DoFnRunnerFactory.java | 6 +- .../utils/NoOpAggregatorFactory.java | 63 -------------------- .../translators/utils/NoOpStepContext.java | 2 +- .../translators/GroupByKeyTranslatorTest.java | 24 ++++---- 7 files changed, 27 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java index d833cd6..dd7fa23 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -20,12 +20,9 @@ package org.apache.beam.runners.gearpump; import java.io.IOException; import java.util.List; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.gearpump.cluster.ApplicationStatus; import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData; @@ -85,14 +82,6 @@ public class GearpumpPipelineResult implements PipelineResult { } @Override - public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - throw new AggregatorRetrievalException( - "PipelineResult getAggregatorValues not supported in Gearpump pipeline", - new UnsupportedOperationException()); - } - - @Override public MetricResults metrics() { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 54c8737..521f665 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; @@ -66,8 +66,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe JavaStream<WindowedValue<KV<K, V>>> inputStream = context.getInputStream(input); int parallelism = context.getPipelineOptions().getParallelism(); - OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>) - input.getWindowingStrategy().getOutputTimeFn(); + TimestampCombiner timestampCombiner = input.getWindowingStrategy().getTimestampCombiner(); WindowFn<KV<K, V>, BoundedWindow> windowFn = (WindowFn<KV<K, V>, BoundedWindow>) input.getWindowingStrategy().getWindowFn(); JavaStream<WindowedValue<KV<K, List<V>>>> outputStream = inputStream @@ -75,9 +74,8 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe new GearpumpWindowFn(windowFn.isNonMerging()), EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window") .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window") - .map(new KeyedByTimestamp<K, V>((OutputTimeFn<? super BoundedWindow>) - input.getWindowingStrategy().getOutputTimeFn()), "keyed_by_timestamp") - .fold(new Merge<>(windowFn, outputTimeFn), "merge") + .map(new KeyedByTimestamp<K, V>(timestampCombiner), "keyed_by_timestamp") + .fold(new Merge<>(windowFn, timestampCombiner), "merge") .map(new Values<K, V>(), "values"); context.setOutputStream(context.getOutput(), outputStream); @@ -148,17 +146,17 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe extends MapFunction<WindowedValue<KV<K, V>>, KV<Instant, WindowedValue<KV<K, V>>>> { - private final OutputTimeFn<? super BoundedWindow> outputTimeFn; + private final TimestampCombiner timestampCombiner; - public KeyedByTimestamp(OutputTimeFn<? super BoundedWindow> outputTimeFn) { - this.outputTimeFn = outputTimeFn; + public KeyedByTimestamp(TimestampCombiner timestampCombiner) { + this.timestampCombiner = timestampCombiner; } @Override public KV<org.joda.time.Instant, WindowedValue<KV<K, V>>> map( WindowedValue<KV<K, V>> wv) { - Instant timestamp = outputTimeFn.assignOutputTime(wv.getTimestamp(), - Iterables.getOnlyElement(wv.getWindows())); + Instant timestamp = timestampCombiner.assign( + Iterables.getOnlyElement(wv.getWindows()), wv.getTimestamp()); return KV.of(timestamp, wv); } } @@ -171,12 +169,12 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe KV<Instant, WindowedValue<KV<K, List<V>>>>> { private final WindowFn<KV<K, V>, BoundedWindow> windowFn; - private final OutputTimeFn<? super BoundedWindow> outputTimeFn; + private final TimestampCombiner timestampCombiner; Merge(WindowFn<KV<K, V>, BoundedWindow> windowFn, - OutputTimeFn<? super BoundedWindow> outputTimeFn) { + TimestampCombiner timestampCombiner) { this.windowFn = windowFn; - this.outputTimeFn = outputTimeFn; + this.timestampCombiner = timestampCombiner; } @Override @@ -229,7 +227,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe mergedWindows.addAll(wv1.getWindows()); } - Instant timestamp = outputTimeFn.combine(t1, t2); + Instant timestamp = timestampCombiner.combine(t1, t2); return KV.of(timestamp, WindowedValue.of(wv1.getValue(), timestamp, mergedWindows, wv1.getPane())); http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index 3473f53..dfd6296 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -38,7 +38,6 @@ import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory; -import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils.RawUnionValue; @@ -92,7 +91,6 @@ public class DoFnFunction<InputT, OutputT> extends mainOutput, sideOutputs, new NoOpStepContext(), - new NoOpAggregatorFactory(), windowingStrategy ); this.sideInputs = sideInputs; http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java index 70b4271..8d55d6f 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; @@ -50,7 +49,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { private final TupleTag<OutputT> mainOutputTag; private final List<TupleTag<?>> sideOutputTags; private final ExecutionContext.StepContext stepContext; - private final AggregatorFactory aggregatorFactory; private final WindowingStrategy<?, ?> windowingStrategy; public DoFnRunnerFactory( @@ -61,7 +59,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, ExecutionContext.StepContext stepContext, - AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) { this.fn = doFn; this.options = pipelineOptions; @@ -70,7 +67,6 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { this.mainOutputTag = mainOutputTag; this.sideOutputTags = sideOutputTags; this.stepContext = stepContext; - this.aggregatorFactory = aggregatorFactory; this.windowingStrategy = windowingStrategy; } @@ -78,7 +74,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { ReadyCheckingSideInputReader sideInputReader) { DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner( options, fn, sideInputReader, outputManager, mainOutputTag, - sideOutputTags, stepContext, aggregatorFactory, windowingStrategy); + sideOutputTags, stepContext, windowingStrategy); return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); } http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java deleted file mode 100644 index 3436930..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.translators.utils; - -import java.io.Serializable; - -import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; - -/** - * no-op aggregator factory. - */ -public class NoOpAggregatorFactory implements AggregatorFactory, Serializable { - - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, - ExecutionContext.StepContext stepContext, - String aggregatorName, - Combine.CombineFn<InputT, AccumT, OutputT> combine) { - return new NoOpAggregator<>(); - } - - private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, - java.io.Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void addValue(InputT value) { - } - - @Override - public String getName() { - // TODO Auto-generated method stub - return null; - } - - @Override - public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() { - // TODO Auto-generated method stub - return null; - } - - }; -} http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java index 4e0a74c..64fd615 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java @@ -61,7 +61,7 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab } @Override - public StateInternals<?> stateInternals() { + public StateInternals stateInternals() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/beam/blob/12b9719e/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java index 4e66ba9..86b60aa 100644 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java @@ -33,10 +33,9 @@ import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator.Gearpum import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -83,15 +82,15 @@ public class GroupByKeyTranslatorTest { } @Parameterized.Parameters(name = "{index}: {0}") - public static Iterable<OutputTimeFn<BoundedWindow>> data() { + public static Iterable<TimestampCombiner> data() { return ImmutableList.of( - OutputTimeFns.outputAtEarliestInputTimestamp(), - OutputTimeFns.outputAtLatestInputTimestamp(), - OutputTimeFns.outputAtEndOfWindow()); + TimestampCombiner.EARLIEST, + TimestampCombiner.LATEST, + TimestampCombiner.END_OF_WINDOW); } @Parameterized.Parameter(0) - public OutputTimeFn<? super BoundedWindow> outputTimeFn; + public TimestampCombiner timestampCombiner; @Test @SuppressWarnings({"rawtypes", "unchecked"}) @@ -99,15 +98,15 @@ public class GroupByKeyTranslatorTest { BoundedWindow window = new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10)); GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp = - new GroupByKeyTranslator.KeyedByTimestamp(outputTimeFn); + new GroupByKeyTranslator.KeyedByTimestamp(timestampCombiner); WindowedValue<KV<String, String>> value = WindowedValue.of( KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING); KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result = keyedByTimestamp.map(value); org.joda.time.Instant time = - outputTimeFn.assignOutputTime( - value.getTimestamp(), Iterables.getOnlyElement(value.getWindows())); + timestampCombiner.assign(Iterables.getOnlyElement(value.getWindows()), + value.getTimestamp()); assertThat(result, equalTo(KV.of(time, value))); } @@ -115,7 +114,8 @@ public class GroupByKeyTranslatorTest { @SuppressWarnings({"rawtypes", "unchecked"}) public void testMerge() { WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10)); - GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows, outputTimeFn); + GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows, + timestampCombiner); org.joda.time.Instant key1 = new org.joda.time.Instant(5); WindowedValue<KV<String, String>> value1 = WindowedValue.of( @@ -140,7 +140,7 @@ public class GroupByKeyTranslatorTest { KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result2 = merge.fold(result1, KV.of(key2, value2)); - assertThat(result2.getKey(), equalTo(outputTimeFn.combine(key1, key2))); + assertThat(result2.getKey(), equalTo(timestampCombiner.combine(key1, key2))); Collection<? extends BoundedWindow> resultWindows = result2.getValue().getWindows(); assertThat(resultWindows.size(), equalTo(1)); IntervalWindow expectedWindow =