Repository: beam Updated Branches: refs/heads/master c9abd15e5 -> 89236e3b5
[BEAM-79] Support merging windows in GearpumpRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7af64720 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7af64720 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7af64720 Branch: refs/heads/master Commit: 7af6472082cbc7f3853e87831ed4bdc72978a3a3 Parents: 4001aeb Author: manuzhang <owenzhang1...@gmail.com> Authored: Tue Feb 7 22:14:18 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Wed Feb 15 14:59:42 2017 +0800 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 5 - .../gearpump/GearpumpPipelineResult.java | 8 +- .../beam/runners/gearpump/GearpumpRunner.java | 24 +--- .../translators/GroupByKeyTranslator.java | 133 +++++++++++-------- .../translators/WindowBoundTranslator.java | 53 +------- .../gearpump/translators/io/GearpumpSource.java | 6 +- .../translators/utils/DoFnRunnerFactory.java | 1 + .../translators/utils/TranslatorUtils.java | 20 +++ .../translators/utils/TranslatorUtilsTest.java | 75 +++++++++++ 9 files changed, 186 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index 7c6fa76..6f91c50 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -93,11 +93,6 @@ org.apache.beam.sdk.transforms.ViewTest, org.apache.beam.sdk.transforms.join.CoGroupByKeyTest </exclude> - <!-- merging windows is not supported in Gearpump --> - <exclude> - org.apache.beam.sdk.transforms.windowing.WindowingTest, - org.apache.beam.sdk.util.ReshuffleTest - </exclude> </excludes> <systemPropertyVariables> <beamTestPipelineOptions> http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java index 9e53517..a3740b7 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.gearpump.cluster.MasterToAppMaster; +import org.apache.gearpump.cluster.ApplicationStatus; import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData; import org.apache.gearpump.cluster.client.ClientContext; import org.joda.time.Duration; @@ -105,7 +105,7 @@ public class GearpumpPipelineResult implements PipelineResult { } private State getGearpumpState() { - String status = null; + ApplicationStatus status = null; List<AppMasterData> apps = JavaConverters.<AppMasterData>seqAsJavaListConverter( (Seq<AppMasterData>) client.listApps().appMasters()).asJava(); @@ -114,9 +114,9 @@ public class GearpumpPipelineResult implements PipelineResult { status = app.status(); } } - if (null == status || status.equals(MasterToAppMaster.AppMasterNonExist())) { + if (null == status || status instanceof ApplicationStatus.NONEXIST$) { return State.UNKNOWN; - } else if (status.equals(MasterToAppMaster.AppMasterActive())) { + } else if (status instanceof ApplicationStatus.ACTIVE$) { return State.RUNNING; } else { return State.STOPPED; http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java index 01fdb3b..9ca1eb2 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java @@ -29,13 +29,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.IdentityWindowFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -74,11 +69,7 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { public <OutputT extends POutput, InputT extends PInput> OutputT apply( PTransform<InputT, OutputT> transform, InputT input) { - if (Window.Bound.class.equals(transform.getClass()) - && isNullOrIdentityWindowFn(((Window.Bound) transform).getWindowFn())) { - return (OutputT) super.apply( - ParDo.of(new IdentityFn()), input); - } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) + if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) && ((PCollectionList<?>) input).size() == 0) { return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of()); } else if (Create.Values.class.equals(transform.getClass())) { @@ -108,7 +99,7 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { TranslationContext translationContext = new TranslationContext(streamApp, options); GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext); translator.translate(pipeline); - int appId = streamApp.submit(); + int appId = streamApp.submit().appId(); return new GearpumpPipelineResult(clientContext, appId); } @@ -140,15 +131,4 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> { return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers)); } - private static class IdentityFn<T> extends DoFn<T, T> { - - @ProcessElement - public void process(ProcessContext c) { - c.output(c.element()); - } - } - - private boolean isNullOrIdentityWindowFn(WindowFn windowFn) { - return windowFn == null || windowFn.getClass().equals(IdentityWindowFn.class); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index ac8e218..69a1d11 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -23,9 +23,8 @@ import com.google.common.collect.Lists; import java.io.Serializable; import java.nio.ByteBuffer; -import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; @@ -34,9 +33,8 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -48,15 +46,14 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; import org.apache.gearpump.streaming.dsl.window.api.Discarding$; import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$; -import org.apache.gearpump.streaming.dsl.window.api.Window; -import org.apache.gearpump.streaming.dsl.window.api.WindowFn; -import org.apache.gearpump.streaming.dsl.window.impl.Bucket; -import scala.collection.JavaConversions; - +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction; +import org.apache.gearpump.streaming.dsl.window.api.Windows; +import org.apache.gearpump.streaming.dsl.window.impl.Window; /** * {@link GroupByKey} is translated to Gearpump groupBy function. */ +@SuppressWarnings({"rawtypes", "unchecked"}) public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> { @Override public void translate(GroupByKey<K, V> transform, TranslationContext context) { @@ -67,69 +64,51 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe int parallelism = context.getPipelineOptions().getParallelism(); OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>) input.getWindowingStrategy().getOutputTimeFn(); + WindowFn<KV<K, V>, BoundedWindow> windowFn = (WindowFn<KV<K, V>, BoundedWindow>) + input.getWindowingStrategy().getWindowFn(); JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream - .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()), + .window(Windows.apply( + new GearpumpWindowFn(windowFn.isNonMerging()), EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window") .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window") .map(new ValueToIterable<K, V>(), "map_value_to_iterable") - .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp") - .reduce(new Merge<K, V>(outputTimeFn), "merge") + .map(new KeyedByTimestamp<K, V>((OutputTimeFn<? super BoundedWindow>) + input.getWindowingStrategy().getOutputTimeFn()), "keyed_by_timestamp") + .reduce(new Merge<>(windowFn, outputTimeFn), "merge") .map(new Values<K, V>(), "values"); context.setOutputStream(context.getOutput(transform), outputStream); } - private static class GearpumpWindowFn<T, W extends BoundedWindow> implements WindowFn, - Serializable { + private static class GearpumpWindowFn<T, W extends BoundedWindow> + implements WindowFunction<WindowedValue<T>>, Serializable { - private org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> windowFn; + private final boolean isNonMerging; - GearpumpWindowFn(org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> windowFn) { - this.windowFn = windowFn; + public GearpumpWindowFn(boolean isNonMerging) { + this.isNonMerging = isNonMerging; } @Override - public scala.collection.immutable.List<Bucket> apply(final Instant timestamp) { + public Window[] apply(Context<WindowedValue<T>> context) { try { - Collection<W> windows = windowFn.assignWindows(windowFn.new AssignContext() { - @Override - public T element() { - throw new UnsupportedOperationException(); - } - - @Override - public org.joda.time.Instant timestamp() { - return TranslatorUtils.java8TimeToJodaTime(timestamp); - } - - @Override - public W window() { - throw new UnsupportedOperationException(); - } - }); - - List<Bucket> buckets = new LinkedList<>(); - for (BoundedWindow window : windows) { - buckets.add(getBucket(window)); - } - return JavaConversions.asScalaBuffer(buckets).toList(); + return toGearpumpWindows(context.element().getWindows().toArray(new BoundedWindow[0])); } catch (Exception e) { throw new RuntimeException(e); } } - private Bucket getBucket(BoundedWindow window) { - if (window instanceof IntervalWindow) { - IntervalWindow intervalWindow = (IntervalWindow) window; - Instant start = TranslatorUtils.jodaTimeToJava8Time(intervalWindow.start()); - Instant end = TranslatorUtils.jodaTimeToJava8Time(intervalWindow.end()); - return new Bucket(start, end); - } else if (window instanceof GlobalWindow) { - Instant end = TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp()); - return new Bucket(Instant.MIN, end); - } else { - throw new RuntimeException("unknown window " + window.getClass().getName()); + @Override + public boolean isNonMerging() { + return isNonMerging; + } + + private Window[] toGearpumpWindows(BoundedWindow[] windows) { + Window[] gwins = new Window[windows.length]; + for (int i = 0; i < windows.length; i++) { + gwins[i] = TranslatorUtils.boundedWindowToGearpumpWindow(windows[i]); } + return gwins; } } @@ -166,19 +145,30 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe extends MapFunction<WindowedValue<KV<K, Iterable<V>>>, KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> { + private final OutputTimeFn<? super BoundedWindow> outputTimeFn; + + public KeyedByTimestamp(OutputTimeFn<? super BoundedWindow> outputTimeFn) { + this.outputTimeFn = outputTimeFn; + } + @Override public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply( WindowedValue<KV<K, Iterable<V>>> wv) { - return KV.of(wv.getTimestamp(), wv); + org.joda.time.Instant timestamp = outputTimeFn.assignOutputTime(wv.getTimestamp(), + Iterables.getOnlyElement(wv.getWindows())); + return KV.of(timestamp, wv); } } private static class Merge<K, V> extends ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> { + private final WindowFn<KV<K, V>, BoundedWindow> windowFn; private final OutputTimeFn<? super BoundedWindow> outputTimeFn; - Merge(OutputTimeFn<? super BoundedWindow> outputTimeFn) { + Merge(WindowFn<KV<K, V>, BoundedWindow> windowFn, + OutputTimeFn<? super BoundedWindow> outputTimeFn) { + this.windowFn = windowFn; this.outputTimeFn = outputTimeFn; } @@ -189,13 +179,40 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe org.joda.time.Instant t1 = kv1.getKey(); org.joda.time.Instant t2 = kv2.getKey(); - WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue(); - WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue(); + final WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue(); + final WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue(); + + final List<BoundedWindow> mergedWindows = new ArrayList<>(); + if (!windowFn.isNonMerging()) { + try { + windowFn.mergeWindows(windowFn.new MergeContext() { + + @Override + public Collection<BoundedWindow> windows() { + ArrayList<BoundedWindow> windows = new ArrayList<>(); + windows.addAll(wv1.getWindows()); + windows.addAll(wv2.getWindows()); + return windows; + } + + @Override + public void merge(Collection<BoundedWindow> toBeMerged, + BoundedWindow mergeResult) throws Exception { + mergedWindows.add(mergeResult); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + mergedWindows.addAll(wv1.getWindows()); + } - return KV.of(outputTimeFn.combine(t1, t2), + org.joda.time.Instant timestamp = outputTimeFn.combine(t1, t2); + return KV.of(timestamp, WindowedValue.of(KV.of(wv1.getValue().getKey(), - Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())), - wv1.getTimestamp(), wv1.getWindows(), wv1.getPane())); + Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())), timestamp, + mergedWindows, wv1.getPane())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java index 9bf1936..c0de2df 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java @@ -21,23 +21,15 @@ package org.apache.beam.runners.gearpump.translators; import com.google.common.collect.Iterables; import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; -import org.apache.gearpump.Message; -import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; -import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; -import org.apache.gearpump.streaming.javaapi.Task; -import org.apache.gearpump.streaming.task.TaskContext; import org.joda.time.Instant; /** @@ -52,34 +44,25 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input); WindowingStrategy<?, ?> outputStrategy = transform.getOutputStrategyInternal(input.getWindowingStrategy()); - WindowFn<T, BoundedWindow> windowFn = - (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); - OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>) - outputStrategy.getOutputTimeFn(); + WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); JavaStream<WindowedValue<T>> outputStream = inputStream - .flatMap(new AssignWindows(windowFn, outputTimeFn), "assign_windows") - .process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp"); + .map(new AssignWindows(windowFn), "assign_windows"); context.setOutputStream(context.getOutput(transform), outputStream); } private static class AssignWindows<T> extends - FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { + MapFunction<WindowedValue<T>, WindowedValue<T>> { private final WindowFn<T, BoundedWindow> windowFn; - private final OutputTimeFn<? super BoundedWindow> outputTimeFn; - AssignWindows( - WindowFn<T, BoundedWindow> windowFn, - OutputTimeFn<? super BoundedWindow> outputTimeFn) { + AssignWindows(WindowFn<T, BoundedWindow> windowFn) { this.windowFn = windowFn; - this.outputTimeFn = outputTimeFn; } @Override - public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) { - List<WindowedValue<T>> ret = new LinkedList<>(); + public WindowedValue<T> apply(final WindowedValue<T> value) { try { Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() { @Override @@ -97,32 +80,10 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou return Iterables.getOnlyElement(value.getWindows()); } }); - for (BoundedWindow window: windows) { - Instant timestamp = outputTimeFn.assignOutputTime(value.getTimestamp(), window); - ret.add(WindowedValue.of( - value.getValue(), timestamp, window, value.getPane())); - } + return WindowedValue.of(value.getValue(), value.getTimestamp(), windows, value.getPane()); } catch (Exception e) { throw new RuntimeException(e); } - return ret.iterator(); - } - } - - /** - * Assign WindowedValue timestamp to Gearpump message. - * @param <T> element type of WindowedValue - */ - public static class AssignTimestampTask<T> extends Task { - - public AssignTimestampTask(TaskContext taskContext, UserConfig userConfig) { - super(taskContext, userConfig); - } - - @Override - public void onNext(Message message) { - final WindowedValue<T> value = (WindowedValue<T>) message.msg(); - context.output(Message.apply(value, value.getTimestamp().getMillis())); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index 6e5b2de..3d0d7c8 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -62,9 +62,8 @@ public abstract class GearpumpSource<T> implements DataSource { this.reader = createReader(options); this.available = reader.start(); } catch (Exception e) { - throw new RuntimeException(e); - } finally { close(); + throw new RuntimeException(e); } } @@ -81,9 +80,8 @@ public abstract class GearpumpSource<T> implements DataSource { timestamp.getMillis()); } } catch (Exception e) { - throw new RuntimeException(e); - } finally { close(); + throw new RuntimeException(e); } return message; } http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java index 7e1402f..aaefb88 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.TupleTag; */ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { + private static final long serialVersionUID = 1083167395296383469L; private final DoFn<InputT, OutputT> fn; private final transient PipelineOptions options; private final SideInputReader sideInputReader; http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java index 9b72275..656fc6a 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -20,6 +20,12 @@ package org.apache.beam.runners.gearpump.translators.utils; import java.time.Instant; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.gearpump.streaming.dsl.window.impl.Window; + + /** * Utility methods for translators. */ @@ -32,4 +38,18 @@ public class TranslatorUtils { public static org.joda.time.Instant java8TimeToJodaTime(Instant time) { return new org.joda.time.Instant(time.toEpochMilli()); } + + public static Window boundedWindowToGearpumpWindow(BoundedWindow window) { + Instant end = TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp().plus(1L)); + if (window instanceof IntervalWindow) { + IntervalWindow intervalWindow = (IntervalWindow) window; + Instant start = TranslatorUtils.jodaTimeToJava8Time(intervalWindow.start()); + return new Window(start, end); + } else if (window instanceof GlobalWindow) { + return new Window(TranslatorUtils.jodaTimeToJava8Time(BoundedWindow.TIMESTAMP_MIN_VALUE), + end); + } else { + throw new RuntimeException("unknown window " + window.getClass().getName()); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java new file mode 100644 index 0000000..10976e8 --- /dev/null +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Lists; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.gearpump.streaming.dsl.window.impl.Window; +import org.junit.Test; + + +/** + * Tests for {@link TranslatorUtils}. + */ +public class TranslatorUtilsTest { + + private static final List<KV<org.joda.time.Instant, Instant>> TEST_VALUES = Lists.newArrayList( + KV.of(new org.joda.time.Instant(0), Instant.EPOCH), + KV.of(new org.joda.time.Instant(42), Instant.ofEpochMilli(42)), + KV.of(new org.joda.time.Instant(Long.MIN_VALUE), Instant.ofEpochMilli(Long.MIN_VALUE)), + KV.of(new org.joda.time.Instant(Long.MAX_VALUE), Instant.ofEpochMilli(Long.MAX_VALUE))); + + @Test + public void testJodaTimeAndJava8TimeConversion() { + for (KV<org.joda.time.Instant, Instant> kv: TEST_VALUES) { + assertThat(TranslatorUtils.jodaTimeToJava8Time(kv.getKey()), + equalTo(kv.getValue())); + assertThat(TranslatorUtils.java8TimeToJodaTime(kv.getValue()), + equalTo(kv.getKey())); + } + } + + @Test + public void testBoundedWindowToGearpumpWindow() { + assertThat(TranslatorUtils.boundedWindowToGearpumpWindow( + new IntervalWindow(new org.joda.time.Instant(0), + new org.joda.time.Instant(Long.MAX_VALUE))), + equalTo(Window.apply(Instant.EPOCH, Instant.ofEpochMilli(Long.MAX_VALUE)))); + assertThat(TranslatorUtils.boundedWindowToGearpumpWindow( + new IntervalWindow(new org.joda.time.Instant(Long.MIN_VALUE), + new org.joda.time.Instant(Long.MAX_VALUE))), + equalTo(Window.apply(Instant.ofEpochMilli(Long.MIN_VALUE), + Instant.ofEpochMilli(Long.MAX_VALUE)))); + BoundedWindow globalWindow = GlobalWindow.INSTANCE; + assertThat(TranslatorUtils.boundedWindowToGearpumpWindow(globalWindow), + equalTo(Window.apply(Instant.ofEpochMilli(Long.MIN_VALUE / 1000), + Instant.ofEpochMilli(Long.MAX_VALUE / 1000).minus(Duration.ofDays(1)).plusMillis(1)))); + } +}