Repository: incubator-beam
Updated Branches:
  refs/heads/gearpump-runner 88de0cb23 -> b6e7bb659


Upgrade Gearpump version


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46d3563e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46d3563e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46d3563e

Branch: refs/heads/gearpump-runner
Commit: 46d3563ec37e7f5b39ae564ac73c7ca2b0185bb5
Parents: 88de0cb
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Fri Dec 16 16:48:16 2016 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Fri Dec 16 16:48:16 2016 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |  39 +------
 .../translators/GroupByKeyTranslator.java       | 109 ++++++++++++++-----
 .../gearpump/translators/io/GearpumpSource.java |  28 +++--
 .../gearpump/translators/io/ValuesSource.java   |  21 ++--
 .../translators/utils/TranslatorUtils.java      |  35 ++++++
 5 files changed, 148 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 04bd724..9320561 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -43,7 +43,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <gearpump.version>0.8.1</gearpump.version>
+    <gearpump.version>0.8.3-SNAPSHOT</gearpump.version>
   </properties>
 
   <profiles>
@@ -121,29 +121,12 @@
       <artifactId>gearpump-core_2.11</artifactId>
       <version>${gearpump.version}</version>
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>com.google.code.findbugs</groupId>
-          <artifactId>jsr305</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.gearpump</groupId>
-      <artifactId>gearpump-daemon_2.11</artifactId>
-      <version>${gearpump.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.gearpump</groupId>
-          <artifactId>gearpump-experimental-cgroup_2.11</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>com.typesafe</groupId>
       <artifactId>config</artifactId>
-      <scope>provided</scope>
       <version>1.3.0</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.scala-lang</groupId>
@@ -170,10 +153,6 @@
       <artifactId>beam-runners-core-java</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>
@@ -198,20 +177,6 @@
       <artifactId>jsr305</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.gearpump</groupId>
-      <artifactId>gearpump-shaded-metrics-graphite_2.11</artifactId>
-      <version>${gearpump.version}</version>
-      <classifier>assembly</classifier>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.gearpump</groupId>
-      <artifactId>gearpump-shaded-guava_2.11</artifactId>
-      <version>${gearpump.version}</version>
-      <classifier>assembly</classifier>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 43e3336..d64f1bf 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -19,23 +19,33 @@
 package org.apache.beam.runners.gearpump.translators;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.window.api.Accumulating$;
+import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
+import org.apache.gearpump.streaming.dsl.window.api.Window;
+import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
+import org.apache.gearpump.streaming.dsl.window.impl.Bucket;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
 import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
-
+import scala.collection.JavaConversions;
 
 
 /**
@@ -44,56 +54,97 @@ import 
org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
 public class GroupByKeyTranslator<K, V> implements 
TransformTranslator<GroupByKey<K, V>> {
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext 
context) {
+    PCollection<KV<K, V>> input = context.getInput(transform);
     JavaStream<WindowedValue<KV<K, V>>> inputStream =
-        context.getInputStream(context.getInput(transform));
+        context.getInputStream(input);
     int parallelism = context.getPipelineOptions().getParallelism();
     JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
-        .flatMap(new KeyedByKeyAndWindow<K, V>(), "keyed_by_Key_and_Window")
-        .groupBy(new GroupByKeyAndWindow<K, V>(), parallelism, 
"group_by_Key_and_Window")
-        .map(new ExtractKeyValue<K, V>(), "extract_Key_and_Value")
+        .window(Window.apply(new 
GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
+            EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window")
+        .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
+        .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
         .reduce(new MergeValue<K, V>(), "merge_value");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
 
-  private static class KeyedByKeyAndWindow<K, V> implements
-      FlatMapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<KV<K, 
BoundedWindow>, V>>> {
+  private static class GearpumpWindowFn<T, W extends BoundedWindow> implements 
WindowFn,
+      Serializable {
+
+    private org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> windowFn;
+
+    GearpumpWindowFn(org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> 
windowFn) {
+      this.windowFn = windowFn;
+    }
 
     @Override
-    public Iterator<WindowedValue<KV<KV<K, BoundedWindow>, V>>> 
apply(WindowedValue<KV<K, V>> wv) {
-      List<WindowedValue<KV<KV<K, BoundedWindow>, V>>> ret = new 
ArrayList<>(wv.getWindows().size
-          ());
-      for (BoundedWindow window : wv.getWindows()) {
-        KV<K, BoundedWindow> keyWin = KV.of(wv.getValue().getKey(), window);
-        ret.add(WindowedValue.of(KV.of(keyWin, wv.getValue().getValue()),
-            wv.getTimestamp(), window, wv.getPane()));
+    public scala.collection.immutable.List<Bucket> apply(final Instant 
timestamp) {
+      try {
+        Collection<W> windows = windowFn.assignWindows(windowFn.new 
AssignContext() {
+          @Override
+          public T element() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public org.joda.time.Instant timestamp() {
+            return TranslatorUtils.java8TimeToJodaTime(timestamp);
+          }
+
+          @Override
+          public W window() {
+            throw new UnsupportedOperationException();
+          }
+        });
+
+        List<Bucket> buckets = new LinkedList<>();
+        for (BoundedWindow window : windows) {
+          buckets.add(getBucket(window));
+        }
+        return JavaConversions.asScalaBuffer(buckets).toList();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private Bucket getBucket(BoundedWindow window) {
+      if (window instanceof IntervalWindow) {
+        IntervalWindow intervalWindow = (IntervalWindow) window;
+        Instant start = 
TranslatorUtils.jodaTimeToJava8Time(intervalWindow.start());
+        Instant end = 
TranslatorUtils.jodaTimeToJava8Time(intervalWindow.end());
+        return new Bucket(start, end);
+      } else if (window instanceof GlobalWindow) {
+        Instant end = 
TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp());
+        return new Bucket(Instant.MIN, end);
+      } else {
+        throw new RuntimeException("unknown window " + 
window.getClass().getName());
       }
-      return ret.iterator();
     }
   }
 
-  private static class GroupByKeyAndWindow<K, V> implements
-      GroupByFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>, KV<K, 
BoundedWindow>> {
+  private static class GroupByFn<K, V> implements
+      GroupByFunction<WindowedValue<KV<K, V>>, K> {
 
     @Override
-    public KV<K, BoundedWindow> apply(WindowedValue<KV<KV<K, BoundedWindow>, 
V>> wv) {
+    public K apply(WindowedValue<KV<K, V>> wv) {
       return wv.getValue().getKey();
     }
   }
 
-  private static class ExtractKeyValue<K, V> implements
-      MapFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>,
-          WindowedValue<KV<K, Iterable<V>>>> {
+  private static class ValueToIterable<K, V>
+      implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, 
Iterable<V>>>> {
+
+
     @Override
-    public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<KV<K, 
BoundedWindow>, V>> wv) {
-      return WindowedValue.of(KV.of(wv.getValue().getKey().getKey(),
-              (Iterable<V>) 
Collections.singletonList(wv.getValue().getValue())),
-          wv.getTimestamp(), wv.getWindows(), wv.getPane());
+    public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) 
{
+      Iterable<V> values = Lists.newArrayList(wv.getValue().getValue());
+      return wv.withValue(KV.of(wv.getValue().getKey(), values));
     }
   }
 
   private static class MergeValue<K, V> implements
       ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
+
     @Override
     public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, 
Iterable<V>>> wv1,
         WindowedValue<KV<K, Iterable<V>>> wv2) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 8f2beb2..b266590 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -22,8 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.io.IOException;
+import java.time.Instant;
 
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -33,19 +36,17 @@ import org.apache.gearpump.Message;
 import org.apache.gearpump.streaming.source.DataSource;
 import org.apache.gearpump.streaming.task.TaskContext;
 
-import org.joda.time.Instant;
-
 /**
  * common methods for {@link BoundedSourceWrapper} and {@link 
UnboundedSourceWrapper}.
  */
 public abstract class GearpumpSource<T> implements DataSource {
 
-  protected final byte[] serializedOptions;
+  private final byte[] serializedOptions;
 
-  protected Source.Reader<T> reader;
-  protected boolean available = false;
+  private Source.Reader<T> reader;
+  private boolean available = false;
 
-  public GearpumpSource(PipelineOptions options) {
+  GearpumpSource(PipelineOptions options) {
     try {
       this.serializedOptions = new ObjectMapper().writeValueAsBytes(options);
     } catch (JsonProcessingException e) {
@@ -56,7 +57,7 @@ public abstract class GearpumpSource<T> implements DataSource 
{
   protected abstract Source.Reader<T> createReader(PipelineOptions options) 
throws IOException;
 
   @Override
-  public void open(TaskContext context, long startTime) {
+  public void open(TaskContext context, Instant startTime) {
     try {
       PipelineOptions options = new ObjectMapper()
           .readValue(serializedOptions, PipelineOptions.class);
@@ -68,13 +69,14 @@ public abstract class GearpumpSource<T> implements 
DataSource {
       close();
     }
   }
+
   @Override
   public Message read() {
     Message message = null;
     try {
       if (available) {
         T data = reader.getCurrent();
-        Instant timestamp = reader.getCurrentTimestamp();
+        org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
         available = reader.advance();
         message = Message.apply(
             WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING),
@@ -99,4 +101,14 @@ public abstract class GearpumpSource<T> implements 
DataSource {
     }
   }
 
+  @Override
+  public Instant getWatermark() {
+    if (reader instanceof UnboundedSource.UnboundedReader) {
+      return TranslatorUtils.jodaTimeToJava8Time(
+          ((UnboundedSource.UnboundedReader) reader).getWatermark());
+    } else {
+      return Instant.now();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index 3b67f09..f5a5eb4 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Instant;
 
 /**
@@ -48,23 +49,19 @@ public class ValuesSource<T> extends UnboundedSource<T, 
UnboundedSource.Checkpoi
   }
 
   private byte[] encode(Iterable<T> values, IterableCoder<T> coder) {
-    ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    try {
+    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
       coder.encode(values, stream, Coder.Context.OUTER);
+      return stream.toByteArray();
     } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
-    return stream.toByteArray();
   }
 
   private Iterable<T> decode(byte[] bytes) throws IOException{
-    ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
-    try {
+    try (ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes)) {
       return iterableCoder.decode(inputStream, Coder.Context.OUTER);
     } catch (IOException ex) {
       throw new RuntimeException(ex);
-    } finally {
-      inputStream.close();
     }
   }
 
@@ -78,7 +75,7 @@ public class ValuesSource<T> extends UnboundedSource<T, 
UnboundedSource.Checkpoi
   public UnboundedReader<T> createReader(PipelineOptions options,
       @Nullable CheckpointMark checkpointMark) {
     try {
-      return new ValuesReader<>(decode(values), iterableCoder, this);
+      return new ValuesReader<>(decode(values), this);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -105,7 +102,7 @@ public class ValuesSource<T> extends UnboundedSource<T, 
UnboundedSource.Checkpoi
     private transient Iterator<T> iterator;
     private T current;
 
-    public ValuesReader(Iterable<T> values, IterableCoder<T> coder,
+    ValuesReader(Iterable<T> values,
         UnboundedSource<T, CheckpointMark> source) {
       this.values = values;
       this.source = source;
@@ -147,7 +144,11 @@ public class ValuesSource<T> extends UnboundedSource<T, 
UnboundedSource.Checkpoi
 
     @Override
     public Instant getWatermark() {
-      return Instant.now();
+      if (iterator.hasNext()) {
+        return Instant.now();
+      } else {
+        return BoundedWindow.TIMESTAMP_MAX_VALUE;
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
new file mode 100644
index 0000000..9b72275
--- /dev/null
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.utils;
+
+import java.time.Instant;
+
+/**
+ * Utility methods for translators.
+ */
+public class TranslatorUtils {
+
+  public static Instant jodaTimeToJava8Time(org.joda.time.Instant time) {
+    return Instant.ofEpochMilli(time.getMillis());
+  }
+
+  public static org.joda.time.Instant java8TimeToJodaTime(Instant time) {
+    return new org.joda.time.Instant(time.toEpochMilli());
+  }
+}

Reply via email to