fix group by window
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e63d42d1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e63d42d1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e63d42d1 Branch: refs/heads/gearpump-runner Commit: e63d42d1113728badc66285e7ce7a8ce204a82d9 Parents: ea633d2 Author: manuzhang <owenzhang1...@gmail.com> Authored: Sat Jan 7 23:07:23 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Sat Jan 14 13:35:31 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/gearpump/GearpumpRunner.java | 3 ++- .../translators/GroupByKeyTranslator.java | 4 +-- .../translators/TranslationContext.java | 1 - .../translators/WindowBoundTranslator.java | 27 ++++++++++++++++++-- .../gearpump/translators/io/GearpumpSource.java | 4 +-- 5 files changed, 30 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java index 9c44da3..01fdb3b 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java @@ -102,8 +102,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { options.getSerializers()); ClientContext clientContext = getClientContext(options, config); options.setClientContext(clientContext); + UserConfig userConfig = UserConfig.empty(); JavaStreamApp streamApp = new JavaStreamApp( - appName, clientContext, UserConfig.empty()); + appName, clientContext, userConfig); TranslationContext translationContext = new TranslationContext(streamApp, options); GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext); translator.translate(pipeline); http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 989957f..8e3ffe3 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; -import org.apache.gearpump.streaming.dsl.window.api.Accumulating$; +import org.apache.gearpump.streaming.dsl.window.api.Discarding$; import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$; import org.apache.gearpump.streaming.dsl.window.api.Window; import org.apache.gearpump.streaming.dsl.window.api.WindowFn; @@ -60,7 +60,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe int parallelism = context.getPipelineOptions().getParallelism(); JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()), - EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window") + EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window") .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window") .map(new ValueToIterable<K, V>(), "map_value_to_iterable") .reduce(new MergeValue<K, V>(), "merge_value"); http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java index 63fb619..b2cff8a 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -50,7 +50,6 @@ public class TranslationContext { public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions pipelineOptions) { this.streamApp = streamApp; this.pipelineOptions = pipelineOptions; - } public void setCurrentTransform(TransformHierarchy.Node treeNode) { http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java index 11f30fc..32dd5de 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java @@ -31,8 +31,12 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.javaapi.Task; import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; +import org.apache.gearpump.streaming.task.TaskContext; import org.joda.time.Instant; /** @@ -50,11 +54,13 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); JavaStream<WindowedValue<T>> outputStream = - inputStream.flatMap(new AssignWindows(windowFn), "assign_windows"); + inputStream + .flatMap(new AssignWindows(windowFn), "assign_windows") + .process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp"); + context.setOutputStream(context.getOutput(transform), outputStream); } - private static class AssignWindows<T> implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { @@ -94,4 +100,21 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou return ret.iterator(); } } + + /** + * Assign WindowedValue timestamp to Gearpump message. + * @param <T> element type of WindowedValue + */ + public static class AssignTimestampTask<T> extends Task { + + public AssignTimestampTask(TaskContext taskContext, UserConfig userConfig) { + super(taskContext, userConfig); + } + + @Override + public void onNext(Message message) { + final WindowedValue<T> value = (WindowedValue<T>) message.msg(); + context.output(Message.apply(value, value.getTimestamp().getMillis())); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index b266590..6e5b2de 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -28,8 +28,6 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.gearpump.Message; @@ -79,7 +77,7 @@ public abstract class GearpumpSource<T> implements DataSource { org.joda.time.Instant timestamp = reader.getCurrentTimestamp(); available = reader.advance(); message = Message.apply( - WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + WindowedValue.valueInGlobalWindow(data), timestamp.getMillis()); } } catch (Exception e) {