fix group by window

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e63d42d1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e63d42d1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e63d42d1

Branch: refs/heads/gearpump-runner
Commit: e63d42d1113728badc66285e7ce7a8ce204a82d9
Parents: ea633d2
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Sat Jan 7 23:07:23 2017 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Sat Jan 14 13:35:31 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/gearpump/GearpumpRunner.java   |  3 ++-
 .../translators/GroupByKeyTranslator.java       |  4 +--
 .../translators/TranslationContext.java         |  1 -
 .../translators/WindowBoundTranslator.java      | 27 ++++++++++++++++++--
 .../gearpump/translators/io/GearpumpSource.java |  4 +--
 5 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 9c44da3..01fdb3b 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -102,8 +102,9 @@ public class GearpumpRunner extends 
PipelineRunner<GearpumpPipelineResult> {
         options.getSerializers());
     ClientContext clientContext = getClientContext(options, config);
     options.setClientContext(clientContext);
+    UserConfig userConfig = UserConfig.empty();
     JavaStreamApp streamApp = new JavaStreamApp(
-        appName, clientContext, UserConfig.empty());
+        appName, clientContext, userConfig);
     TranslationContext translationContext = new TranslationContext(streamApp, 
options);
     GearpumpPipelineTranslator translator = new 
GearpumpPipelineTranslator(translationContext);
     translator.translate(pipeline);

http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 989957f..8e3ffe3 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.KV;
 
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.window.api.Accumulating$;
+import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
 import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
 import org.apache.gearpump.streaming.dsl.window.api.Window;
 import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
@@ -60,7 +60,7 @@ public class GroupByKeyTranslator<K, V> implements 
TransformTranslator<GroupByKe
     int parallelism = context.getPipelineOptions().getParallelism();
     JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
         .window(Window.apply(new 
GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
-            EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window")
+            EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
         .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
         .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
         .reduce(new MergeValue<K, V>(), "merge_value");

http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index 63fb619..b2cff8a 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -50,7 +50,6 @@ public class TranslationContext {
   public TranslationContext(JavaStreamApp streamApp, GearpumpPipelineOptions 
pipelineOptions) {
     this.streamApp = streamApp;
     this.pipelineOptions = pipelineOptions;
-
   }
 
   public void setCurrentTransform(TransformHierarchy.Node treeNode) {

http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 11f30fc..32dd5de 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -31,8 +31,12 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.Task;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.task.TaskContext;
 import org.joda.time.Instant;
 
 /**
@@ -50,11 +54,13 @@ public class WindowBoundTranslator<T> implements  
TransformTranslator<Window.Bou
     WindowFn<T, BoundedWindow> windowFn =
         (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
     JavaStream<WindowedValue<T>> outputStream =
-        inputStream.flatMap(new AssignWindows(windowFn), "assign_windows");
+        inputStream
+            .flatMap(new AssignWindows(windowFn), "assign_windows")
+            .process(AssignTimestampTask.class, 1, UserConfig.empty(), 
"assign_timestamp");
+
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
 
-
   private static class AssignWindows<T> implements
       FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
 
@@ -94,4 +100,21 @@ public class WindowBoundTranslator<T> implements  
TransformTranslator<Window.Bou
       return ret.iterator();
     }
   }
+
+  /**
+   * Assign WindowedValue timestamp to Gearpump message.
+   * @param <T> element type of WindowedValue
+   */
+  public static class AssignTimestampTask<T> extends Task {
+
+    public AssignTimestampTask(TaskContext taskContext, UserConfig userConfig) 
{
+      super(taskContext, userConfig);
+    }
+
+    @Override
+    public void onNext(Message message) {
+      final WindowedValue<T> value = (WindowedValue<T>) message.msg();
+      context.output(Message.apply(value, value.getTimestamp().getMillis()));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/e63d42d1/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index b266590..6e5b2de 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -28,8 +28,6 @@ import 
org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 
 import org.apache.gearpump.Message;
@@ -79,7 +77,7 @@ public abstract class GearpumpSource<T> implements DataSource 
{
         org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
         available = reader.advance();
         message = Message.apply(
-            WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING),
+            WindowedValue.valueInGlobalWindow(data),
             timestamp.getMillis());
       }
     } catch (Exception e) {

Reply via email to