http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java new file mode 100644 index 0000000..10c8bbf --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.common.base.Preconditions; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.util.*; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.base.Throwables; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; + +import java.util.Collection; + +/** + * An abstract class that encapsulates the common code of the the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} + * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and + * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations. + * */ +public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> { + + private final DoFn<IN, OUTDF> doFn; + private final WindowingStrategy<?, ?> windowingStrategy; + private transient PipelineOptions options; + + private DoFnProcessContext context; + + public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) { + Preconditions.checkNotNull(options); + Preconditions.checkNotNull(windowingStrategy); + Preconditions.checkNotNull(doFn); + + this.doFn = doFn; + this.options = options; + this.windowingStrategy = windowingStrategy; + } + + private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { + if (this.context == null) { + this.context = new DoFnProcessContext(function, outCollector); + } + } + + @Override + public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception { + this.initContext(doFn, out); + + // for each window the element belongs to, create a new copy here. + Collection<? extends BoundedWindow> windows = value.getWindows(); + if (windows.size() <= 1) { + processElement(value); + } else { + for (BoundedWindow window : windows) { + processElement(WindowedValue.of( + value.getValue(), value.getTimestamp(), window, value.getPane())); + } + } + } + + private void processElement(WindowedValue<IN> value) throws Exception { + this.context.setElement(value); + this.doFn.startBundle(context); + doFn.processElement(context); + this.doFn.finishBundle(context); + } + + private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext { + + private final DoFn<IN, OUTDF> fn; + + protected final Collector<WindowedValue<OUTFL>> collector; + + private WindowedValue<IN> element; + + private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) { + function.super(); + super.setupDelegateAggregators(); + + this.fn = function; + this.collector = outCollector; + } + + public void setElement(WindowedValue<IN> value) { + this.element = value; + } + + @Override + public IN element() { + return this.element.getValue(); + } + + @Override + public Instant timestamp() { + return this.element.getTimestamp(); + } + + @Override + public BoundedWindow window() { + if (!(fn instanceof DoFn.RequiresWindowAccess)) { + throw new UnsupportedOperationException( + "window() is only available in the context of a DoFn marked as RequiresWindow."); + } + + Collection<? extends BoundedWindow> windows = this.element.getWindows(); + if (windows.size() != 1) { + throw new IllegalArgumentException("Each element is expected to belong to 1 window. " + + "This belongs to " + windows.size() + "."); + } + return windows.iterator().next(); + } + + @Override + public PaneInfo pane() { + return this.element.getPane(); + } + + @Override + public WindowingInternals<IN, OUTDF> windowingInternals() { + return windowingInternalsHelper(element, collector); + } + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new RuntimeException("sideInput() is not supported in Streaming mode."); + } + + @Override + public void output(OUTDF output) { + outputWithTimestamp(output, this.element.getTimestamp()); + } + + @Override + public void outputWithTimestamp(OUTDF output, Instant timestamp) { + outputWithTimestampHelper(element, output, timestamp, collector); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + sideOutputWithTimestamp(tag, output, this.element.getTimestamp()); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutputWithTimestampHelper(element, output, timestamp, collector, tag); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + Accumulator acc = getRuntimeContext().getAccumulator(name); + if (acc != null) { + AccumulatorHelper.compareAccumulatorTypes(name, + SerializableFnAggregatorWrapper.class, acc.getClass()); + return (Aggregator<AggInputT, AggOutputT>) acc; + } + + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = + new SerializableFnAggregatorWrapper<>(combiner); + getRuntimeContext().addAccumulator(name, accumulator); + return accumulator; + } + } + + protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) { + if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) { + throw new IllegalArgumentException(String.format( + "Cannot output with timestamp %s. Output timestamps must be no earlier than the " + + "timestamp of the current input (%s) minus the allowed skew (%s). See the " + + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.", + timestamp, ref.getTimestamp(), + PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()))); + } + } + + protected <T> WindowedValue<T> makeWindowedValue( + T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + final WindowFn windowFn = windowingStrategy.getWindowFn(); + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + windows = windowFn.assignWindows(windowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } + + /////////// ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES ///////////////// + + public abstract void outputWithTimestampHelper( + WindowedValue<IN> inElement, + OUTDF output, + Instant timestamp, + Collector<WindowedValue<OUTFL>> outCollector); + + public abstract <T> void sideOutputWithTimestampHelper( + WindowedValue<IN> inElement, + T output, + Instant timestamp, + Collector<WindowedValue<OUTFL>> outCollector, + TupleTag<T> tag); + + public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper( + WindowedValue<IN> inElement, + Collector<WindowedValue<OUTFL>> outCollector); + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java new file mode 100644 index 0000000..e115a15 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -0,0 +1,631 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.*; +import com.google.cloud.dataflow.sdk.coders.*; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.*; +import com.google.cloud.dataflow.sdk.values.*; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.operators.*; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.*; + +/** + * This class is the key class implementing all the windowing/triggering logic of Apache Beam. + * To provide full compatibility and support for all the windowing/triggering combinations offered by + * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in + * ({@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}. + * <p/> + * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already + * grouped by key</b>. Each of the elements that enter here, registers a timer + * (see {@link TimerInternals#setTimer(TimerInternals.TimerData)} in the + * {@link FlinkGroupAlsoByWindowWrapper#activeTimers}. + * This is essentially a timestamp indicating when to trigger the computation over the window this + * element belongs to. + * <p/> + * When a watermark arrives, all the registered timers are checked to see which ones are ready to + * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from + * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers} + * list, and are fed into the {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn} + * for furhter processing. + */ +public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> + extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>> + implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> { + + private static final long serialVersionUID = 1L; + + private transient PipelineOptions options; + + private transient CoderRegistry coderRegistry; + + private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator; + + private ProcessContext context; + + private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy; + + private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn; + + private final KvCoder<K, VIN> inputKvCoder; + + /** + * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a + * key whose elements are currently waiting to be processed, and its associated state. + */ + private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(); + + /** + * Timers waiting to be processed. + */ + private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private FlinkTimerInternals timerInternals = new FlinkTimerInternals(); + + /** + * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. + * This method assumes that <b>elements are already grouped by key</b>. + * <p/> + * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)} + * is that this method assumes that a combiner function is provided + * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). + * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state. + * + * @param options the general job configuration options. + * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. + * @param combiner the combiner to be used. + * @param outputKvCoder the type of the output values. + */ + public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create( + PipelineOptions options, + PCollection input, + KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner, + KvCoder<K, VOUT> outputKvCoder) { + Preconditions.checkNotNull(options); + + KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); + FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options, + input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner); + + Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( + outputKvCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo = + new CoderTypeInformation<>(windowedOutputElemCoder); + + DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey + .transform("GroupByWindowWithCombiner", + new CoderTypeInformation<>(outputKvCoder), + windower) + .returns(outputTypeInfo); + + return groupedByKeyAndWindow; + } + + /** + * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy. + * This method assumes that <b>elements are already grouped by key</b>. + * <p/> + * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)} + * is that this method assumes no combiner function + * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). + * + * @param options the general job configuration options. + * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. + */ + public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable( + PipelineOptions options, + PCollection input, + KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) { + Preconditions.checkNotNull(options); + + KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder(); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); + + FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options, + input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null); + + Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder); + KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder); + + Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of( + outputElemCoder, + input.getWindowingStrategy().getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo = + new CoderTypeInformation<>(windowedOutputElemCoder); + + DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey + .transform("GroupByWindow", + new CoderTypeInformation<>(windowedOutputElemCoder), + windower) + .returns(outputTypeInfo); + + return groupedByKeyAndWindow; + } + + public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper + createForTesting(PipelineOptions options, + CoderRegistry registry, + WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, + KvCoder<K, VIN> inputCoder, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { + Preconditions.checkNotNull(options); + + return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner); + } + + private FlinkGroupAlsoByWindowWrapper(PipelineOptions options, + CoderRegistry registry, + WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy, + KvCoder<K, VIN> inputCoder, + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) { + Preconditions.checkNotNull(options); + + this.options = Preconditions.checkNotNull(options); + this.coderRegistry = Preconditions.checkNotNull(registry); + this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder(); + this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy(); + this.combineFn = combiner; + this.operator = createGroupAlsoByWindowOperator(); + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void open() throws Exception { + super.open(); + this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals); + } + + /** + * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}, + * <b> if not already created</b>. + * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then + * a function with that combiner is created, so that elements are combined as they arrive. This is + * done for speed and (in most of the cases) for reduction of the per-window state. + */ + private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() { + if (this.operator == null) { + if (this.combineFn == null) { + // Thus VOUT == Iterable<VIN> + Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder(); + + this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create( + (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder)); + } else { + Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder(); + + AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn + .withInputCoder(combineFn, coderRegistry, inputKvCoder); + + this.operator = GroupAlsoByWindowViaWindowSetDoFn.create( + (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn)); + } + } + return this.operator; + } + + private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception { + context.setElement(workItem, getStateInternalsForKey(workItem.key())); + + // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded. + operator.startBundle(context); + operator.processElement(context); + operator.finishBundle(context); + } + + @Override + public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception { + ArrayList<WindowedValue<VIN>> elements = new ArrayList<>(); + elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(), + element.getValue().getWindows(), element.getValue().getPane())); + processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + context.setCurrentInputWatermark(new Instant(mark.getTimestamp())); + + Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); + if (!timers.isEmpty()) { + for (K key : timers.keySet()) { + processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key))); + } + } + + /** + * This is to take into account the different semantics of the Watermark in Flink and + * in Dataflow. To understand the reasoning behind the Dataflow semantics and its + * watermark holding logic, see the documentation of + * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)} + * */ + long millis = Long.MAX_VALUE; + for (FlinkStateInternals state : perKeyStateInternals.values()) { + Instant watermarkHold = state.getWatermarkHold(); + if (watermarkHold != null && watermarkHold.getMillis() < millis) { + millis = watermarkHold.getMillis(); + } + } + + if (mark.getTimestamp() < millis) { + millis = mark.getTimestamp(); + } + + context.setCurrentOutputWatermark(new Instant(millis)); + + // Don't forget to re-emit the watermark for further operators down the line. + // This is critical for jobs with multiple aggregation steps. + // Imagine a job with a groupByKey() on key K1, followed by a map() that changes + // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark + // is not re-emitted, the second aggregation would never be triggered, and no result + // will be produced. + output.emitWatermark(new Watermark(millis)); + } + + @Override + public void close() throws Exception { + super.close(); + } + + private void registerActiveTimer(K key, TimerInternals.TimerData timer) { + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + if (timersForKey == null) { + timersForKey = new HashSet<>(); + } + timersForKey.add(timer); + activeTimers.put(key, timersForKey); + } + + private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + if (timersForKey != null) { + timersForKey.remove(timer); + if (timersForKey.isEmpty()) { + activeTimers.remove(key); + } else { + activeTimers.put(key, timersForKey); + } + } + } + + /** + * Returns the list of timers that are ready to fire. These are the timers + * that are registered to be triggered at a time before the current watermark. + * We keep these timers in a Set, so that they are deduplicated, as the same + * timer can be registered multiple times. + */ + private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { + + // we keep the timers to return in a different list and launch them later + // because we cannot prevent a trigger from registering another trigger, + // which would lead to concurrent modification exception. + Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create(); + + Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); + + Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); + while (timerIt.hasNext()) { + TimerInternals.TimerData timerData = timerIt.next(); + if (timerData.getTimestamp().isBefore(currentWatermark)) { + toFire.put(keyWithTimers.getKey(), timerData); + timerIt.remove(); + } + } + + if (keyWithTimers.getValue().isEmpty()) { + it.remove(); + } + } + return toFire; + } + + /** + * Gets the state associated with the specified key. + * + * @param key the key whose state we want. + * @return The {@link FlinkStateInternals} + * associated with that key. + */ + private FlinkStateInternals<K> getStateInternalsForKey(K key) { + FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key); + if (stateInternals == null) { + Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn(); + stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn); + perKeyStateInternals.put(key, stateInternals); + } + return stateInternals; + } + + private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> { + @Override + public void setTimer(TimerData timerKey) { + registerActiveTimer(context.element().key(), timerKey); + } + + @Override + public void deleteTimer(TimerData timerKey) { + unregisterActiveTimer(context.element().key(), timerKey); + } + } + + private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext { + + private final FlinkTimerInternals timerInternals; + + private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector; + + private FlinkStateInternals<K> stateInternals; + + private KeyedWorkItem<K, VIN> element; + + public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function, + TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector, + FlinkTimerInternals timerInternals) { + function.super(); + super.setupDelegateAggregators(); + + this.collector = Preconditions.checkNotNull(outCollector); + this.timerInternals = Preconditions.checkNotNull(timerInternals); + } + + public void setElement(KeyedWorkItem<K, VIN> element, + FlinkStateInternals<K> stateForKey) { + this.element = element; + this.stateInternals = stateForKey; + } + + public void setCurrentInputWatermark(Instant watermark) { + this.timerInternals.setCurrentInputWatermark(watermark); + } + + public void setCurrentOutputWatermark(Instant watermark) { + this.timerInternals.setCurrentOutputWatermark(watermark); + } + + @Override + public KeyedWorkItem<K, VIN> element() { + return this.element; + } + + @Override + public Instant timestamp() { + throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PipelineOptions getPipelineOptions() { + // TODO: PipelineOptions need to be available on the workers. + // Ideally they are captured as part of the pipeline. + // For now, construct empty options so that StateContexts.createFromComponents + // will yield a valid StateContext, which is needed to support the StateContext.window(). + if (options == null) { + options = new PipelineOptions() { + @Override + public <T extends PipelineOptions> T as(Class<T> kls) { + return null; + } + + @Override + public <T extends PipelineOptions> T cloneAs(Class<T> kls) { + return null; + } + + @Override + public Class<? extends PipelineRunner<?>> getRunner() { + return null; + } + + @Override + public void setRunner(Class<? extends PipelineRunner<?>> kls) { + + } + + @Override + public CheckEnabled getStableUniqueNames() { + return null; + } + + @Override + public void setStableUniqueNames(CheckEnabled enabled) { + } + }; + } + return options; + } + + @Override + public void output(KV<K, VOUT> output) { + throw new UnsupportedOperationException( + "output() is not available when processing KeyedWorkItems."); + } + + @Override + public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) { + throw new UnsupportedOperationException( + "outputWithTimestamp() is not available when processing KeyedWorkItems."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems."); + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "window() is not available when processing KeyedWorkItems."); + } + + @Override + public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() { + return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() { + + @Override + public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() { + return stateInternals; + } + + @Override + public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + // TODO: No need to represent timestamp twice. + collector.setAbsoluteTimestamp(timestamp.getMillis()); + collector.collect(WindowedValue.of(output, timestamp, windows, pane)); + + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException("windows() is not available in Streaming mode."); + } + + @Override + public PaneInfo pane() { + throw new UnsupportedOperationException("pane() is not available in Streaming mode."); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new RuntimeException("sideInput() is not available in Streaming mode."); + } + }; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new RuntimeException("sideInput() is not supported in Streaming mode."); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + throw new RuntimeException("sideOutput() is not available when grouping by window."); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutput(tag, output); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + Accumulator acc = getRuntimeContext().getAccumulator(name); + if (acc != null) { + AccumulatorHelper.compareAccumulatorTypes(name, + SerializableFnAggregatorWrapper.class, acc.getClass()); + return (Aggregator<AggInputT, AggOutputT>) acc; + } + + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator = + new SerializableFnAggregatorWrapper<>(combiner); + getRuntimeContext().addAccumulator(name, accumulator); + return accumulator; + } + } + + ////////////// Checkpointing implementation //////////////// + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + StateCheckpointWriter writer = StateCheckpointWriter.create(out); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + + // checkpoint the timers + StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder); + + // checkpoint the state + StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder); + + // checkpoint the timerInternals + context.timerInternals.encodeTimerInternals(context, writer, + inputKvCoder, windowingStrategy.getWindowFn().windowCoder()); + + taskState.setOperatorState(out.closeAndGetHandle()); + return taskState; + } + + @Override + public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { + super.restoreState(taskState, recoveryTimestamp); + + final ClassLoader userClassloader = getUserCodeClassloader(); + + Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + + @SuppressWarnings("unchecked") + StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); + DataInputView in = inputState.getState(userClassloader); + StateCheckpointReader reader = new StateCheckpointReader(in); + + // restore the timers + this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); + + // restore the state + this.perKeyStateInternals = StateCheckpointUtils.decodeState( + reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader); + + // restore the timerInternals. + this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java new file mode 100644 index 0000000..1a6a665 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; +import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; + +/** + * This class groups the elements by key. It assumes that already the incoming stream + * is composed of <code>[Key,Value]</code> pairs. + * */ +public class FlinkGroupByKeyWrapper { + + /** + * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement + * multiple interfaces. + */ + private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> { + } + + public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) { + final Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder); + final boolean isKeyVoid = keyCoder instanceof VoidCoder; + + return inputDataStream.keyBy( + new KeySelectorWithQueryableResultType<K, V>() { + + @Override + public K getKey(WindowedValue<KV<K, V>> value) throws Exception { + return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE : + value.getValue().getKey(); + } + + @Override + public TypeInformation<K> getProducedType() { + return keyTypeInfo; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java new file mode 100644 index 0000000..df7f953 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingInternals; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.base.Preconditions; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.util.Map; + +/** + * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} Beam transformation. + * */ +public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> { + + private final TupleTag<?> mainTag; + private final Map<TupleTag<?>, Integer> outputLabels; + + public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) { + super(options, windowingStrategy, doFn); + this.mainTag = Preconditions.checkNotNull(mainTag); + this.outputLabels = Preconditions.checkNotNull(tagsToLabels); + } + + @Override + public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) { + checkTimestamp(inElement, timestamp); + Integer index = outputLabels.get(mainTag); + collector.collect(makeWindowedValue( + new RawUnionValue(index, output), + timestamp, + inElement.getWindows(), + inElement.getPane())); + } + + @Override + public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) { + checkTimestamp(inElement, timestamp); + Integer index = outputLabels.get(tag); + if (index != null) { + collector.collect(makeWindowedValue( + new RawUnionValue(index, output), + timestamp, + inElement.getWindows(), + inElement.getPane())); + } + } + + @Override + public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) { + throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " + + "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " + + "is not available in this class."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java new file mode 100644 index 0000000..2ed5620 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.*; +import com.google.cloud.dataflow.sdk.util.state.StateInternals; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.*; + +/** + * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} Beam transformation. + * */ +public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> { + + public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) { + super(options, windowingStrategy, doFn); + } + + @Override + public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) { + checkTimestamp(inElement, timestamp); + collector.collect(makeWindowedValue( + output, + timestamp, + inElement.getWindows(), + inElement.getPane())); + } + + @Override + public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + throw new RuntimeException("sideOutput() not not available in ParDo.Bound()."); + } + + @Override + public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) { + return new WindowingInternals<IN, OUT>() { + @Override + public StateInternals stateInternals() { + throw new NullPointerException("StateInternals are not available for ParDo.Bound()."); + } + + @Override + public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + collector.collect(makeWindowedValue(output, timestamp, windows, pane)); + } + + @Override + public TimerInternals timerInternals() { + throw new NullPointerException("TimeInternals are not available for ParDo.Bound()."); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return inElement.getWindows(); + } + + @Override + public PaneInfo pane() { + return inElement.getPane(); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode."); + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new RuntimeException("sideInput() not implemented."); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java new file mode 100644 index 0000000..f6c243f --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; + +import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.io.ByteArrayInputStream; +import java.util.List; + +/** + * This flat map function bootstraps from collection elements and turns them into WindowedValues + * (as required by the Flink runner). + */ +public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> { + + private final List<byte[]> elements; + private final Coder<OUT> coder; + + public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) { + this.elements = elements; + this.coder = coder; + } + + @Override + public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception { + + @SuppressWarnings("unchecked") + OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE; + for (byte[] element : elements) { + ByteArrayInputStream bai = new ByteArrayInputStream(element); + OUT outValue = coder.decode(bai, Coder.Context.OUTER); + + if (outValue == null) { + out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + } else { + out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + } + } + + out.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java new file mode 100644 index 0000000..2857efd --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.common.base.Preconditions; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into + * unbounded Beam sources (see {@link UnboundedSource}). + * */ +public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> { + + private final PipelineOptions options; + private final RichParallelSourceFunction<T> flinkSource; + + public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) { + if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + options = Preconditions.checkNotNull(pipelineOptions); + flinkSource = Preconditions.checkNotNull(source); + validate(); + } + + public RichParallelSourceFunction<T> getFlinkSource() { + return this.flinkSource; + } + + @Override + public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + + @Override + public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + + @Nullable + @Override + public Coder<C> getCheckpointMarkCoder() { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + + + @Override + public void validate() { + Preconditions.checkNotNull(options); + Preconditions.checkNotNull(flinkSource); + if(!options.getRunner().equals(FlinkPipelineRunner.class)) { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } + } + + @Override + public Coder<T> getDefaultOutputCoder() { + throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java new file mode 100644 index 0000000..1389e9d --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging. + * */ +public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> { + + private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of(); + + private static final long serialVersionUID = 1L; + + private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500; + + private static final int CONNECTION_TIMEOUT_TIME = 0; + + private final String hostname; + private final int port; + private final char delimiter; + private final long maxNumRetries; + private final long delayBetweenRetries; + + public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) { + this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP); + } + + public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) { + this.hostname = hostname; + this.port = port; + this.delimiter = delimiter; + this.maxNumRetries = maxNumRetries; + this.delayBetweenRetries = delayBetweenRetries; + } + + public String getHostname() { + return this.hostname; + } + + public int getPort() { + return this.port; + } + + public char getDelimiter() { + return this.delimiter; + } + + public long getMaxNumRetries() { + return this.maxNumRetries; + } + + public long getDelayBetweenRetries() { + return this.delayBetweenRetries; + } + + @Override + public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.<UnboundedSource<String, C>>singletonList(this); + } + + @Override + public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) { + return new UnboundedSocketReader(this); + } + + @Nullable + @Override + public Coder getCheckpointMarkCoder() { + // Flink and Dataflow have different checkpointing mechanisms. + // In our case we do not need a coder. + return null; + } + + @Override + public void validate() { + checkArgument(port > 0 && port < 65536, "port is out of range"); + checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)"); + checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive"); + } + + @Override + public Coder getDefaultOutputCoder() { + return DEFAULT_SOCKET_CODER; + } + + public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable { + + private static final long serialVersionUID = 7526472295622776147L; + private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class); + + private final UnboundedSocketSource source; + + private Socket socket; + private BufferedReader reader; + + private boolean isRunning; + + private String currentRecord; + + public UnboundedSocketReader(UnboundedSocketSource source) { + this.source = source; + } + + private void openConnection() throws IOException { + this.socket = new Socket(); + this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME); + this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); + this.isRunning = true; + } + + @Override + public boolean start() throws IOException { + int attempt = 0; + while (!isRunning) { + try { + openConnection(); + LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort()); + + return advance(); + } catch (IOException e) { + LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs..."); + + if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) { + try { + Thread.sleep(this.source.getDelayBetweenRetries()); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } else { + this.isRunning = false; + break; + } + } + } + LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort()); + return false; + } + + @Override + public boolean advance() throws IOException { + final StringBuilder buffer = new StringBuilder(); + int data; + while (isRunning && (data = reader.read()) != -1) { + // check if the string is complete + if (data != this.source.getDelimiter()) { + buffer.append((char) data); + } else { + if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') { + buffer.setLength(buffer.length() - 1); + } + this.currentRecord = buffer.toString(); + buffer.setLength(0); + return true; + } + } + return false; + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + return new byte[0]; + } + + @Override + public String getCurrent() throws NoSuchElementException { + return this.currentRecord; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + this.reader.close(); + this.socket.close(); + this.isRunning = false; + LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + "."); + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource<String, ?> getCurrentSource() { + return this.source; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java new file mode 100644 index 0000000..97084cf --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; + +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.joda.time.Instant; + +/** + * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} + * interface. + * + *</p> + * For now we support non-parallel, not checkpointed sources. + * */ +public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable { + + private final String name; + private final UnboundedSource.UnboundedReader<T> reader; + + private StreamingRuntimeContext runtime = null; + private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null; + + private volatile boolean isRunning = false; + + public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) { + this.name = transform.getName(); + this.reader = transform.getSource().createReader(options, null); + } + + public String getName() { + return this.name; + } + + WindowedValue<T> makeWindowedValue(T output, Instant timestamp) { + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + } + + @Override + public void run(SourceContext<WindowedValue<T>> ctx) throws Exception { + if (!(ctx instanceof StreamSource.ManualWatermarkContext)) { + throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " + + "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source."); + } + + context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx; + runtime = (StreamingRuntimeContext) getRuntimeContext(); + + this.isRunning = true; + boolean inputAvailable = reader.start(); + + setNextWatermarkTimer(this.runtime); + + while (isRunning) { + + while (!inputAvailable && isRunning) { + // wait a bit until we retry to pull more records + Thread.sleep(50); + inputAvailable = reader.advance(); + } + + if (inputAvailable) { + + // get it and its timestamp from the source + T item = reader.getCurrent(); + Instant timestamp = reader.getCurrentTimestamp(); + + // write it to the output collector + synchronized (ctx.getCheckpointLock()) { + context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis()); + } + + inputAvailable = reader.advance(); + } + + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void trigger(long timestamp) throws Exception { + if (this.isRunning) { + synchronized (context.getCheckpointLock()) { + long watermarkMillis = this.reader.getWatermark().getMillis(); + context.emitWatermark(new Watermark(watermarkMillis)); + } + setNextWatermarkTimer(this.runtime); + } + } + + private void setNextWatermarkTimer(StreamingRuntimeContext runtime) { + if (this.isRunning) { + long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval(); + long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval); + runtime.registerTimer(timeToNextWatermark, this); + } + } + + private long getTimeToNextWaternark(long watermarkInterval) { + return System.currentTimeMillis() + watermarkInterval; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java new file mode 100644 index 0000000..fc75948 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.TimerInternals; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import org.joda.time.Instant; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.Serializable; + +/** + * An implementation of Beam's {@link TimerInternals}, that also provides serialization functionality. + * The latter is used when snapshots of the current state are taken, for fault-tolerance. + * */ +public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable { + private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + + public void setCurrentInputWatermark(Instant watermark) { + checkIfValidInputWatermark(watermark); + this.currentInputWatermark = watermark; + } + + public void setCurrentOutputWatermark(Instant watermark) { + checkIfValidOutputWatermark(watermark); + this.currentOutputWatermark = watermark; + } + + private void setCurrentInputWatermarkAfterRecovery(Instant watermark) { + if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) { + throw new RuntimeException("Explicitly setting the input watermark is only allowed on " + + "initialization after recovery from a node failure. Apparently this is not " + + "the case here as the watermark is already set."); + } + this.currentInputWatermark = watermark; + } + + private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) { + if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) { + throw new RuntimeException("Explicitly setting the output watermark is only allowed on " + + "initialization after recovery from a node failure. Apparently this is not " + + "the case here as the watermark is already set."); + } + this.currentOutputWatermark = watermark; + } + + @Override + public Instant currentProcessingTime() { + return Instant.now(); + } + + @Override + public Instant currentInputWatermarkTime() { + return currentInputWatermark; + } + + @Nullable + @Override + public Instant currentSynchronizedProcessingTime() { + // TODO + return null; + } + + @Override + public Instant currentOutputWatermarkTime() { + return currentOutputWatermark; + } + + private void checkIfValidInputWatermark(Instant newWatermark) { + if (currentInputWatermark.isAfter(newWatermark)) { + throw new IllegalArgumentException(String.format( + "Cannot set current input watermark to %s. Newer watermarks " + + "must be no earlier than the current one (%s).", + newWatermark, currentInputWatermark)); + } + } + + private void checkIfValidOutputWatermark(Instant newWatermark) { + if (currentOutputWatermark.isAfter(newWatermark)) { + throw new IllegalArgumentException(String.format( + "Cannot set current output watermark to %s. Newer watermarks " + + "must be no earlier than the current one (%s).", + newWatermark, currentOutputWatermark)); + } + } + + public void encodeTimerInternals(DoFn.ProcessContext context, + StateCheckpointWriter writer, + KvCoder<K, VIN> kvCoder, + Coder<? extends BoundedWindow> windowCoder) throws IOException { + if (context == null) { + throw new RuntimeException("The Context has not been initialized."); + } + + writer.setTimestamp(currentInputWatermark); + writer.setTimestamp(currentOutputWatermark); + } + + public void restoreTimerInternals(StateCheckpointReader reader, + KvCoder<K, VIN> kvCoder, + Coder<? extends BoundedWindow> windowCoder) throws IOException { + setCurrentInputWatermarkAfterRecovery(reader.getTimestamp()); + setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp()); + } +}