Add Window.Bound translator
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/85d54ab2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/85d54ab2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/85d54ab2 Branch: refs/heads/gearpump-runner Commit: 85d54ab20f21297da25059ed7b4c8ed02e93bb74 Parents: 46d3563 Author: manuzhang <owenzhang1...@gmail.com> Authored: Fri Dec 16 16:49:06 2016 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Fri Dec 16 16:49:06 2016 +0800 ---------------------------------------------------------------------- .../gearpump/GearpumpPipelineTranslator.java | 3 + .../translators/WindowBoundTranslator.java | 97 ++++++++++++++++++++ 2 files changed, 100 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85d54ab2/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index 84dfeec..20624ed 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -29,6 +29,7 @@ import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator; import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator; import org.apache.beam.runners.gearpump.translators.TransformTranslator; import org.apache.beam.runners.gearpump.translators.TranslationContext; +import org.apache.beam.runners.gearpump.translators.WindowBoundTranslator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -37,6 +38,7 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PValue; import org.apache.gearpump.util.Graph; @@ -71,6 +73,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default registerTransformTranslator(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator()); + registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator()); registerTransformTranslator(Create.Values.class, new CreateValuesTranslator()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85d54ab2/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java new file mode 100644 index 0000000..11f30fc --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators; + +import com.google.common.collect.Iterables; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; +import org.joda.time.Instant; + +/** + * {@link Window.Bound} is translated to Gearpump flatMap function. + */ +@SuppressWarnings("unchecked") +public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> { + + @Override + public void translate(Window.Bound<T> transform, TranslationContext context) { + PCollection<T> input = context.getInput(transform); + JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input); + WindowingStrategy<?, ?> outputStrategy = + transform.getOutputStrategyInternal(input.getWindowingStrategy()); + WindowFn<T, BoundedWindow> windowFn = + (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); + JavaStream<WindowedValue<T>> outputStream = + inputStream.flatMap(new AssignWindows(windowFn), "assign_windows"); + context.setOutputStream(context.getOutput(transform), outputStream); + } + + + private static class AssignWindows<T> implements + FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { + + private final WindowFn<T, BoundedWindow> fn; + + AssignWindows(WindowFn<T, BoundedWindow> fn) { + this.fn = fn; + } + + @Override + public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) { + List<WindowedValue<T>> ret = new LinkedList<>(); + try { + Collection<BoundedWindow> windows = fn.assignWindows(fn.new AssignContext() { + @Override + public T element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); + } + }); + for (BoundedWindow window: windows) { + ret.add(WindowedValue.of( + value.getValue(), value.getTimestamp(), window, value.getPane())); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return ret.iterator(); + } + } +}