Repository: beam Updated Branches: refs/heads/master 69522fe66 -> f5efca029
ApexRunner SDF support Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5efca02 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5efca02 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5efca02 Branch: refs/heads/master Commit: f5efca0292eb9ace2d6b4895bc08e94344854336 Parents: 69522fe Author: Thomas Weise <t...@apache.org> Authored: Sat Apr 8 13:58:48 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon May 15 17:04:20 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 34 +++++ .../translation/ApexPipelineTranslator.java | 23 ++++ .../apex/translation/ParDoTranslator.java | 68 +++++++++- .../operators/ApexGroupByKeyOperator.java | 4 +- .../operators/ApexParDoOperator.java | 124 ++++++++++++++++++- .../operators/ApexProcessFnOperator.java | 8 +- .../operators/ApexTimerInternals.java | 21 +++- .../translation/utils/ApexStateInternals.java | 4 + .../translation/utils/StateInternalsProxy.java | 7 +- .../operators/ApexTimerInternalsTest.java | 8 +- 10 files changed, 286 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 366308e..2fd0b22 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -31,6 +31,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; import org.apache.apex.api.EmbeddedAppLauncher; @@ -38,9 +39,11 @@ import org.apache.apex.api.Launcher; import org.apache.apex.api.Launcher.AppHandle; import org.apache.apex.api.Launcher.LaunchMode; import org.apache.beam.runners.apex.translation.ApexPipelineTranslator; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PrimitiveCreate; +import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; @@ -51,18 +54,23 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.AsIterable; import org.apache.beam.sdk.transforms.View.AsSingleton; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.hadoop.conf.Configuration; /** @@ -112,6 +120,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { PTransformOverride.of( PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), new StreamingCombineGloballyAsSingletonView.Factory())) + .add( + PTransformOverride.of( + PTransformMatchers.splittableParDoMulti(), + new SplittableParDoOverrideFactory<>())) .build(); } @@ -424,4 +436,26 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } } + /** + * A {@link PTransformOverrideFactory} that overrides a + * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with + * {@link SplittableParDo}. + */ + static class SplittableParDoOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory< + PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> { + @Override + public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform( + AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> + transform) { + return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), + new SplittableParDo<>(transform.getTransform())); + } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, + PCollectionTuple newOutput) { + return ReplacementOutputs.tagged(outputs, newOutput); + } + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index fdeefc7..b3a6d1c 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -23,7 +23,9 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView; +import org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator; import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.construction.PrimitiveCreate; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.sdk.Pipeline; @@ -35,6 +37,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.slf4j.Logger; @@ -60,6 +63,10 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { static { // register TransformTranslators registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>()); + registerTransformTranslator(SplittableParDo.ProcessElements.class, + new ParDoTranslator.SplittableProcessElementsTranslator()); + registerTransformTranslator(SplittableParDo.GBKIntoKeyedWorkItems.class, + new GBKIntoKeyedWorkItemsTranslator()); registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); @@ -174,4 +181,20 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { LOG.debug("view {}", view.getName()); } } + + private static class GBKIntoKeyedWorkItemsTranslator<K, InputT> + implements TransformTranslator<SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> { + + @Override + public void translate( + SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform, TranslationContext context) { + // https://issues.apache.org/jira/browse/BEAM-1850 + ApexProcessFnOperator<KV<K, InputT>> operator = ApexProcessFnOperator.toKeyedWorkItems( + context.getPipelineOptions()); + context.addOperator(operator, operator.outputPort); + context.addStream(context.getInput(), operator.inputPort); + } + + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index 2e3d902..9133cb6 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -30,11 +30,13 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.PCollection; @@ -83,7 +85,7 @@ class ParDoTranslator<InputT, OutputT> } Map<TupleTag<?>, PValue> outputs = context.getOutputs(); - PCollection<InputT> input = (PCollection<InputT>) context.getInput(); + PCollection<InputT> input = context.getInput(); List<PCollectionView<?>> sideInputs = transform.getSideInputs(); Coder<InputT> inputCoder = input.getCoder(); WindowedValueCoder<InputT> wvInputCoder = @@ -130,6 +132,70 @@ class ParDoTranslator<InputT, OutputT> } } + static class SplittableProcessElementsTranslator<InputT, OutputT, + RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + implements TransformTranslator<SplittableParDo.ProcessElements<InputT, OutputT, + RestrictionT, TrackerT>> { + + @Override + public void translate( + SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform, + TranslationContext context) { + + Map<TupleTag<?>, PValue> outputs = context.getOutputs(); + PCollection<InputT> input = context.getInput(); + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); + Coder<InputT> inputCoder = input.getCoder(); + WindowedValueCoder<InputT> wvInputCoder = + FullWindowedValueCoder.of( + inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + DoFn<InputT, OutputT> doFn = (DoFn) transform.newProcessFn(transform.getFn()); + ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( + context.getPipelineOptions(), + doFn, + transform.getMainOutputTag(), + transform.getAdditionalOutputTags().getAll(), + input.getWindowingStrategy(), + sideInputs, + wvInputCoder, + context.getStateBackend()); + + Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size()); + for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { + checkArgument( + output.getValue() instanceof PCollection, + "%s %s outputs non-PCollection %s of type %s", + ParDo.MultiOutput.class.getSimpleName(), + context.getFullName(), + output.getValue(), + output.getValue().getClass().getSimpleName()); + PCollection<?> pc = (PCollection<?>) output.getValue(); + if (output.getKey().equals(transform.getMainOutputTag())) { + ports.put(pc, operator.output); + } else { + int portIndex = 0; + for (TupleTag<?> tag : transform.getAdditionalOutputTags().getAll()) { + if (tag.equals(output.getKey())) { + ports.put(pc, operator.additionalOutputPorts[portIndex]); + break; + } + portIndex++; + } + } + } + + context.addOperator(operator, ports); + context.addStream(context.getInput(), operator.input); + if (!sideInputs.isEmpty()) { + addSideInputs(operator.sideInput1, sideInputs, context); + } + + } + } + + static void addSideInputs( Operator.InputPort<?> sideInputPort, List<PCollectionView<?>> sideInputs, http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 85836ad..1d48e20 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -202,7 +202,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator, windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane()); - timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark); + timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark, null); ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(kv.getKey()); reduceFnRunner.processElements(Collections.singletonList(updatedWindowedValue)); @@ -211,7 +211,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator, @Override public void fireTimer(K key, Collection<TimerData> timerData) { - timerInternals.setContext(key, keyCoder, inputWatermark); + timerInternals.setContext(key, keyCoder, inputWatermark, null); ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key); try { reduceFnRunner.onTimers(timerData); http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 8c516b1..7fee0d5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.apex.translation.operators; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; @@ -28,8 +31,10 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; @@ -44,20 +49,33 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; +import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.TimerInternalsFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; @@ -65,6 +83,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +91,8 @@ import org.slf4j.LoggerFactory; /** * Apex operator for Beam {@link DoFn}. */ -public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager { +public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager, + ApexTimerInternals.TimerProcessor<Object> { private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class); private boolean traceTuples = true; @@ -139,6 +159,14 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements TimerInternals.TimerDataCoder timerCoder = TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); this.currentKeyTimerInternals = new ApexTimerInternals<>(timerCoder); + + if (doFn instanceof SplittableParDo.ProcessFn) { + // we know that it is keyed on String + Coder<?> keyCoder = StringUtf8Coder.of(); + this.currentKeyStateInternals = new StateInternalsProxy<>( + stateBackend.newStateInternalsFactory(keyCoder)); + } + } @SuppressWarnings("unused") // for Kryo @@ -272,7 +300,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements keyCoder = kwiCoder.getKeyCoder(); } ((StateInternalsProxy) currentKeyStateInternals).setKey(key); - currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark)); + currentKeyTimerInternals.setContext(key, keyCoder, + new Instant(this.currentInputWatermark), + new Instant(this.currentOutputWatermark) + ); } Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner .processElementInReadyWindows(elem); @@ -286,9 +317,47 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements } } + @Override + public void fireTimer(Object key, Collection<TimerData> timerDataSet) { + pushbackDoFnRunner.startBundle(); + @SuppressWarnings("unchecked") + Coder<Object> keyCoder = (Coder) currentKeyStateInternals.getKeyCoder(); + ((StateInternalsProxy) currentKeyStateInternals).setKey(key); + currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark), + new Instant(this.currentOutputWatermark)); + for (TimerData timerData : timerDataSet) { + StateNamespace namespace = timerData.getNamespace(); + checkArgument(namespace instanceof WindowNamespace); + BoundedWindow window = ((WindowNamespace<?>) namespace).getWindow(); + pushbackDoFnRunner.onTimer(timerData.getTimerId(), window, + timerData.getTimestamp(), timerData.getDomain()); + } + pushbackDoFnRunner.finishBundle(); + } + private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) { this.currentInputWatermark = mark.getTimestamp(); - + long minEventTimeTimer = currentKeyTimerInternals.fireReadyTimers( + this.currentInputWatermark, + this, TimeDomain.EVENT_TIME); + + checkState(minEventTimeTimer >= currentInputWatermark, + "Event time timer processing generates new timer(s) behind watermark."); + //LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer, + // currentInputWatermark); + + // TODO: is this the right way to trigger processing time timers? + // drain all timers below current watermark, including those that result from firing + long minProcessingTimeTimer = Long.MIN_VALUE; + while (minProcessingTimeTimer < currentInputWatermark) { + minProcessingTimeTimer = currentKeyTimerInternals.fireReadyTimers( + this.currentInputWatermark, + this, TimeDomain.PROCESSING_TIME); + if (minProcessingTimeTimer < currentInputWatermark) { + LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer, + currentInputWatermark); + } + } if (sideInputs.isEmpty()) { if (traceTuples) { LOG.debug("\nemitting watermark {}\n", mark); @@ -376,6 +445,52 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements pushbackDoFnRunner = SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + if (doFn instanceof SplittableParDo.ProcessFn) { + + @SuppressWarnings("unchecked") + StateInternalsFactory<String> stateInternalsFactory = + (StateInternalsFactory<String>) this.currentKeyStateInternals.getFactory(); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + SplittableParDo.ProcessFn<InputT, OutputT, Object, RestrictionTracker<Object>> + splittableDoFn = (SplittableParDo.ProcessFn) doFn; + splittableDoFn.setStateInternalsFactory(stateInternalsFactory); + TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() { + @Override + public TimerInternals timerInternalsForKey(String key) { + return currentKeyTimerInternals; + } + }; + splittableDoFn.setTimerInternalsFactory(timerInternalsFactory); + splittableDoFn.setProcessElementInvoker( + new OutputAndTimeBoundedSplittableProcessElementInvoker<>( + doFn, + pipelineOptions.get(), + new OutputWindowedValue<OutputT>() { + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + output( + mainOutputTag, + WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, Instant timestamp, + Collection<? extends BoundedWindow> windows, PaneInfo pane) { + output(tag, WindowedValue.of(output, timestamp, windows, pane)); + } + }, + sideInputReader, + Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), + 10000, + Duration.standardSeconds(10))); + } + } @Override @@ -390,6 +505,9 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements @Override public void endWindow() { + currentKeyTimerInternals.fireReadyTimers( + currentKeyTimerInternals.currentProcessingTime().getMillis(), + this, TimeDomain.PROCESSING_TIME); } private static class LongMin { http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java index 835c9e0..9f4e6c5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java @@ -91,7 +91,13 @@ public class ApexProcessFnOperator<InputT> extends BaseOperator { /** * Convert {@link KV} into {@link KeyedWorkItem}s. */ - public static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> { + public static <K, V> ApexProcessFnOperator<KV<K, V>> toKeyedWorkItems( + ApexPipelineOptions options) { + ApexOperatorFn<KV<K, V>> fn = new ToKeyedWorkItems<>(); + return new ApexProcessFnOperator<KV<K, V>>(fn, options.isTupleTracingEnabled()); + } + + private static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> { @Override public final void process(ApexStreamTuple<WindowedValue<KV<K, V>>> tuple, OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) { http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java index 1eb224c..0a0dd50 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java @@ -55,6 +55,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { private transient K currentKey; private transient Instant currentInputWatermark; + private transient Instant currentOutputWatermark; private transient Coder<K> keyCoder; public ApexTimerInternals(TimerDataCoder timerDataCoder) { @@ -62,10 +63,12 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { this.processingTimeTimers = new TimerSet(timerDataCoder); } - public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark) { + public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark, + Instant outputWatermark) { this.currentKey = key; this.keyCoder = keyCoder; this.currentInputWatermark = inputWatermark; + this.currentOutputWatermark = outputWatermark; } @VisibleForTesting @@ -118,7 +121,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { @Override public Instant currentOutputWatermarkTime() { - return null; + return currentOutputWatermark; } public interface TimerProcessor<K> { @@ -128,11 +131,19 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { /** * Fire the timers that are ready. These are the timers * that are registered to be triggered at a time before the current time. + * Timer processing may register new timers, which can cause the returned + * timestamp to be before the the current time. The caller may repeat + * the call until such backdated timers are cleared. + * @return minimum timestamp of registered timers. */ - public void fireReadyTimers(long currentTime, + public long fireReadyTimers(long currentTime, TimerProcessor<K> timerProcessor, TimeDomain timeDomain) { TimerSet timers = getTimerSet(timeDomain); + // move minTimestamp first, + // timer additions that result from firing may modify it + timers.minTimestamp = currentTime; + // we keep the timers to return in a different list and launch them later // because we cannot prevent a trigger from registering another timer, // which would lead to concurrent modification exception. @@ -173,6 +184,8 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { } } } + + return timers.minTimestamp; } private Slice getKeyBytes(K key) { @@ -186,6 +199,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { protected static class TimerSet implements Serializable { private final Map<Slice, Set<Slice>> activeTimers = new HashMap<>(); private final TimerDataCoder timerDataCoder; + private long minTimestamp = Long.MAX_VALUE; protected TimerSet(TimerDataCoder timerDataCoder) { this.timerDataCoder = timerDataCoder; @@ -205,6 +219,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable { } activeTimers.put(keyBytes, timersForKey); + this.minTimestamp = Math.min(minTimestamp, timer.getTimestamp().getMillis()); } public void deleteTimer(Slice keyBytes, StateNamespace namespace, String timerId) { http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index eeea6d1..18ea8e4 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -438,6 +438,10 @@ public class ApexStateInternals<K> implements StateInternals { this.keyCoder = keyCoder; } + public Coder<K> getKeyCoder() { + return this.keyCoder; + } + @Override public ApexStateInternals<K> stateInternalsForKey(K key) { final Slice keyBytes; http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java index ccf7e43..b652c68 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java @@ -24,6 +24,7 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTag; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateContext; @@ -34,7 +35,7 @@ import org.apache.beam.sdk.state.StateContext; @DefaultSerializer(JavaSerializer.class) public class StateInternalsProxy<K> implements StateInternals, Serializable { - private final StateInternalsFactory<K> factory; + private final ApexStateInternals.ApexStateInternalsFactory<K> factory; private transient K currentKey; public StateInternalsProxy(ApexStateInternals.ApexStateInternalsFactory<K> factory) { @@ -45,6 +46,10 @@ public class StateInternalsProxy<K> implements StateInternals, Serializable { return this.factory; } + public Coder<K> getKeyCoder() { + return factory.getKeyCoder(); + } + public void setKey(K key) { currentKey = key; } http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java index 7b52223..ba1c801 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java @@ -60,7 +60,7 @@ public class ApexTimerInternalsTest { Instant instant2 = new Instant(2); ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder); - timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now()); + timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now(), null); TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(), instant0, TimeDomain.EVENT_TIME); @@ -98,7 +98,7 @@ public class ApexTimerInternalsTest { Instant instant1 = new Instant(1); ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder); - timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now()); + timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now(), null); TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(), instant0, TimeDomain.EVENT_TIME); @@ -133,11 +133,11 @@ public class ApexTimerInternalsTest { new Instant(0), TimeDomain.EVENT_TIME); String key = "key"; ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder); - timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now()); + timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now(), null); timerInternals.setTimer(timerData); ApexTimerInternals<String> cloned; assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(timerInternals)); - cloned.setContext(key, StringUtf8Coder.of(), Instant.now()); + cloned.setContext(key, StringUtf8Coder.of(), Instant.now(), null); Map<?, Set<Slice>> timers = cloned.getTimerSet(TimeDomain.EVENT_TIME).getMap(); assertEquals(1, timers.size()); }