Repository: incubator-beam Updated Branches: refs/heads/gearpump-runner 88de0cb23 -> b6e7bb659
Upgrade Gearpump version Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46d3563e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46d3563e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46d3563e Branch: refs/heads/gearpump-runner Commit: 46d3563ec37e7f5b39ae564ac73c7ca2b0185bb5 Parents: 88de0cb Author: manuzhang <owenzhang1...@gmail.com> Authored: Fri Dec 16 16:48:16 2016 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Fri Dec 16 16:48:16 2016 +0800 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 39 +------ .../translators/GroupByKeyTranslator.java | 109 ++++++++++++++----- .../gearpump/translators/io/GearpumpSource.java | 28 +++-- .../gearpump/translators/io/ValuesSource.java | 21 ++-- .../translators/utils/TranslatorUtils.java | 35 ++++++ 5 files changed, 148 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index 04bd724..9320561 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -43,7 +43,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <gearpump.version>0.8.1</gearpump.version> + <gearpump.version>0.8.3-SNAPSHOT</gearpump.version> </properties> <profiles> @@ -121,29 +121,12 @@ <artifactId>gearpump-core_2.11</artifactId> <version>${gearpump.version}</version> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.gearpump</groupId> - <artifactId>gearpump-daemon_2.11</artifactId> - <version>${gearpump.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.gearpump</groupId> - <artifactId>gearpump-experimental-cgroup_2.11</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> - <scope>provided</scope> <version>1.3.0</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> @@ -170,10 +153,6 @@ <artifactId>beam-runners-core-java</artifactId> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> @@ -198,20 +177,6 @@ <artifactId>jsr305</artifactId> </dependency> <dependency> - <groupId>org.apache.gearpump</groupId> - <artifactId>gearpump-shaded-metrics-graphite_2.11</artifactId> - <version>${gearpump.version}</version> - <classifier>assembly</classifier> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.gearpump</groupId> - <artifactId>gearpump-shaded-guava_2.11</artifactId> - <version>${gearpump.version}</version> - <classifier>assembly</classifier> - <scope>test</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 43e3336..d64f1bf 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -19,23 +19,33 @@ package org.apache.beam.runners.gearpump.translators; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; +import java.io.Serializable; +import java.time.Instant; +import java.util.Collection; +import java.util.LinkedList; import java.util.List; +import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; -import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; +import org.apache.gearpump.streaming.dsl.window.api.Accumulating$; +import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$; +import org.apache.gearpump.streaming.dsl.window.api.Window; +import org.apache.gearpump.streaming.dsl.window.api.WindowFn; +import org.apache.gearpump.streaming.dsl.window.impl.Bucket; import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction; import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; - +import scala.collection.JavaConversions; /** @@ -44,56 +54,97 @@ import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> { @Override public void translate(GroupByKey<K, V> transform, TranslationContext context) { + PCollection<KV<K, V>> input = context.getInput(transform); JavaStream<WindowedValue<KV<K, V>>> inputStream = - context.getInputStream(context.getInput(transform)); + context.getInputStream(input); int parallelism = context.getPipelineOptions().getParallelism(); JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream - .flatMap(new KeyedByKeyAndWindow<K, V>(), "keyed_by_Key_and_Window") - .groupBy(new GroupByKeyAndWindow<K, V>(), parallelism, "group_by_Key_and_Window") - .map(new ExtractKeyValue<K, V>(), "extract_Key_and_Value") + .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()), + EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window") + .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window") + .map(new ValueToIterable<K, V>(), "map_value_to_iterable") .reduce(new MergeValue<K, V>(), "merge_value"); context.setOutputStream(context.getOutput(transform), outputStream); } - private static class KeyedByKeyAndWindow<K, V> implements - FlatMapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<KV<K, BoundedWindow>, V>>> { + private static class GearpumpWindowFn<T, W extends BoundedWindow> implements WindowFn, + Serializable { + + private org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> windowFn; + + GearpumpWindowFn(org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> windowFn) { + this.windowFn = windowFn; + } @Override - public Iterator<WindowedValue<KV<KV<K, BoundedWindow>, V>>> apply(WindowedValue<KV<K, V>> wv) { - List<WindowedValue<KV<KV<K, BoundedWindow>, V>>> ret = new ArrayList<>(wv.getWindows().size - ()); - for (BoundedWindow window : wv.getWindows()) { - KV<K, BoundedWindow> keyWin = KV.of(wv.getValue().getKey(), window); - ret.add(WindowedValue.of(KV.of(keyWin, wv.getValue().getValue()), - wv.getTimestamp(), window, wv.getPane())); + public scala.collection.immutable.List<Bucket> apply(final Instant timestamp) { + try { + Collection<W> windows = windowFn.assignWindows(windowFn.new AssignContext() { + @Override + public T element() { + throw new UnsupportedOperationException(); + } + + @Override + public org.joda.time.Instant timestamp() { + return TranslatorUtils.java8TimeToJodaTime(timestamp); + } + + @Override + public W window() { + throw new UnsupportedOperationException(); + } + }); + + List<Bucket> buckets = new LinkedList<>(); + for (BoundedWindow window : windows) { + buckets.add(getBucket(window)); + } + return JavaConversions.asScalaBuffer(buckets).toList(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Bucket getBucket(BoundedWindow window) { + if (window instanceof IntervalWindow) { + IntervalWindow intervalWindow = (IntervalWindow) window; + Instant start = TranslatorUtils.jodaTimeToJava8Time(intervalWindow.start()); + Instant end = TranslatorUtils.jodaTimeToJava8Time(intervalWindow.end()); + return new Bucket(start, end); + } else if (window instanceof GlobalWindow) { + Instant end = TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp()); + return new Bucket(Instant.MIN, end); + } else { + throw new RuntimeException("unknown window " + window.getClass().getName()); } - return ret.iterator(); } } - private static class GroupByKeyAndWindow<K, V> implements - GroupByFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>, KV<K, BoundedWindow>> { + private static class GroupByFn<K, V> implements + GroupByFunction<WindowedValue<KV<K, V>>, K> { @Override - public KV<K, BoundedWindow> apply(WindowedValue<KV<KV<K, BoundedWindow>, V>> wv) { + public K apply(WindowedValue<KV<K, V>> wv) { return wv.getValue().getKey(); } } - private static class ExtractKeyValue<K, V> implements - MapFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>, - WindowedValue<KV<K, Iterable<V>>>> { + private static class ValueToIterable<K, V> + implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> { + + @Override - public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<KV<K, BoundedWindow>, V>> wv) { - return WindowedValue.of(KV.of(wv.getValue().getKey().getKey(), - (Iterable<V>) Collections.singletonList(wv.getValue().getValue())), - wv.getTimestamp(), wv.getWindows(), wv.getPane()); + public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) { + Iterable<V> values = Lists.newArrayList(wv.getValue().getValue()); + return wv.withValue(KV.of(wv.getValue().getKey(), values)); } } private static class MergeValue<K, V> implements ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> { + @Override public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1, WindowedValue<KV<K, Iterable<V>>> wv2) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index 8f2beb2..b266590 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -22,8 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.time.Instant; +import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -33,19 +36,17 @@ import org.apache.gearpump.Message; import org.apache.gearpump.streaming.source.DataSource; import org.apache.gearpump.streaming.task.TaskContext; -import org.joda.time.Instant; - /** * common methods for {@link BoundedSourceWrapper} and {@link UnboundedSourceWrapper}. */ public abstract class GearpumpSource<T> implements DataSource { - protected final byte[] serializedOptions; + private final byte[] serializedOptions; - protected Source.Reader<T> reader; - protected boolean available = false; + private Source.Reader<T> reader; + private boolean available = false; - public GearpumpSource(PipelineOptions options) { + GearpumpSource(PipelineOptions options) { try { this.serializedOptions = new ObjectMapper().writeValueAsBytes(options); } catch (JsonProcessingException e) { @@ -56,7 +57,7 @@ public abstract class GearpumpSource<T> implements DataSource { protected abstract Source.Reader<T> createReader(PipelineOptions options) throws IOException; @Override - public void open(TaskContext context, long startTime) { + public void open(TaskContext context, Instant startTime) { try { PipelineOptions options = new ObjectMapper() .readValue(serializedOptions, PipelineOptions.class); @@ -68,13 +69,14 @@ public abstract class GearpumpSource<T> implements DataSource { close(); } } + @Override public Message read() { Message message = null; try { if (available) { T data = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); + org.joda.time.Instant timestamp = reader.getCurrentTimestamp(); available = reader.advance(); message = Message.apply( WindowedValue.of(data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), @@ -99,4 +101,14 @@ public abstract class GearpumpSource<T> implements DataSource { } } + @Override + public Instant getWatermark() { + if (reader instanceof UnboundedSource.UnboundedReader) { + return TranslatorUtils.jodaTimeToJava8Time( + ((UnboundedSource.UnboundedReader) reader).getWatermark()); + } else { + return Instant.now(); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java index 3b67f09..f5a5eb4 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.joda.time.Instant; /** @@ -48,23 +49,19 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi } private byte[] encode(Iterable<T> values, IterableCoder<T> coder) { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - try { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { coder.encode(values, stream, Coder.Context.OUTER); + return stream.toByteArray(); } catch (IOException ex) { throw new RuntimeException(ex); } - return stream.toByteArray(); } private Iterable<T> decode(byte[] bytes) throws IOException{ - ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); - try { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes)) { return iterableCoder.decode(inputStream, Coder.Context.OUTER); } catch (IOException ex) { throw new RuntimeException(ex); - } finally { - inputStream.close(); } } @@ -78,7 +75,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi public UnboundedReader<T> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) { try { - return new ValuesReader<>(decode(values), iterableCoder, this); + return new ValuesReader<>(decode(values), this); } catch (IOException e) { throw new RuntimeException(e); } @@ -105,7 +102,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi private transient Iterator<T> iterator; private T current; - public ValuesReader(Iterable<T> values, IterableCoder<T> coder, + ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) { this.values = values; this.source = source; @@ -147,7 +144,11 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi @Override public Instant getWatermark() { - return Instant.now(); + if (iterator.hasNext()) { + return Instant.now(); + } else { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46d3563e/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java new file mode 100644 index 0000000..9b72275 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import java.time.Instant; + +/** + * Utility methods for translators. + */ +public class TranslatorUtils { + + public static Instant jodaTimeToJava8Time(org.joda.time.Instant time) { + return Instant.ofEpochMilli(time.getMillis()); + } + + public static org.joda.time.Instant java8TimeToJodaTime(Instant time) { + return new org.joda.time.Instant(time.toEpochMilli()); + } +}