This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c9d9828 [BEAM-2939] Ensure that we update the watermark even when no elements are processed. (#11735) c9d9828 is described below commit c9d9828cecc2c092443619c162de7fd89ad1b1d9 Author: Lukasz Cwik <lukec...@gmail.com> AuthorDate: Sat May 16 19:02:47 2020 -0700 [BEAM-2939] Ensure that we update the watermark even when no elements are processed. (#11735) --- .../src/main/java/org/apache/beam/sdk/io/Read.java | 182 +++++++++++++++------ 1 file changed, 130 insertions(+), 52 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 3574eef..e02c938 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -21,13 +21,19 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import com.google.auto.value.AutoValue; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; @@ -49,7 +55,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -443,42 +448,43 @@ public class Read { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class); private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; - private final Coder<CheckpointT> restrictionCoder; + private final Coder<CheckpointT> checkpointCoder; - private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> restrictionCoder) { - this.restrictionCoder = restrictionCoder; + private UnboundedSourceAsSDFWrapperFn(Coder<CheckpointT> checkpointCoder) { + this.checkpointCoder = checkpointCoder; } @GetInitialRestriction - public KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction( + public UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction( @Element UnboundedSource<OutputT, CheckpointT> element) { - return KV.of(element, null); + return UnboundedSourceRestriction.create(element, null, BoundedWindow.TIMESTAMP_MIN_VALUE); } @SplitRestriction public void splitRestriction( - @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> restriction, - OutputReceiver<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> receiver, + @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> restriction, + OutputReceiver<UnboundedSourceRestriction<OutputT, CheckpointT>> receiver, PipelineOptions pipelineOptions) throws Exception { // The empty unbounded source is trivially done and hence we don't need to output any splits // for it. - if (restriction.getKey() instanceof EmptyUnboundedSource) { + if (restriction.getSource() instanceof EmptyUnboundedSource) { return; } // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has // been created. - if (restriction.getValue() != null - && !(restriction.getValue() + if (restriction.getCheckpoint() != null + && !(restriction.getCheckpoint() instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) { receiver.output(restriction); } try { for (UnboundedSource<OutputT, CheckpointT> split : - restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { - receiver.output(KV.of(split, null)); + restriction.getSource().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { + receiver.output( + UnboundedSourceRestriction.create(split, null, restriction.getWatermark())); } } catch (Exception e) { receiver.output(restriction); @@ -487,51 +493,54 @@ public class Read { @NewTracker public RestrictionTracker< - KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue<OutputT>[]> + UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]> restrictionTracker( - @Restriction KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> restriction, + @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> restriction, PipelineOptions pipelineOptions) { return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); } @ProcessElement public ProcessContinuation processElement( - RestrictionTracker< - KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue[]> + RestrictionTracker<UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue[]> tracker, ManualWatermarkEstimator<Instant> watermarkEstimator, OutputReceiver<ValueWithRecordId<OutputT>> receiver, BundleFinalizer bundleFinalizer) throws IOException { - KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction = + UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction = tracker.currentRestriction(); UnboundedSourceValue<OutputT>[] out = new UnboundedSourceValue[1]; while (tracker.tryClaim(out)) { receiver.outputWithTimestamp( new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); - watermarkEstimator.setWatermark(ensureTimestampWithinBounds(out[0].getWatermark())); } + UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = + tracker.currentRestriction(); + + // Advance the watermark even if zero elements may have been output. + watermarkEstimator.setWatermark( + ensureTimestampWithinBounds(currentRestriction.getWatermark())); + // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial and is not // the initial restriction. The initial restriction would have been finalized as part of // a prior bundle being executed. - KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction = - tracker.currentRestriction(); @SuppressWarnings("ReferenceEquality") boolean isInitialRestriction = initialRestriction == currentRestriction; - if (currentRestriction.getValue() != null + if (currentRestriction.getCheckpoint() != null && !isInitialRestriction - && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) { + && !(tracker.currentRestriction().getCheckpoint() instanceof NoopCheckpointMark)) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardMinutes(DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS)), - currentRestriction.getValue()::finalizeCheckpoint); + currentRestriction.getCheckpoint()::finalizeCheckpoint); } // If we have been split/checkpoint by a runner, the tracker will have been updated to the // empty source and we will return stop. Otherwise the unbounded source has only temporarily // run out of work. - if (tracker.currentRestriction().getKey() instanceof EmptyUnboundedSource) { + if (currentRestriction.getSource() instanceof EmptyUnboundedSource) { return ProcessContinuation.stop(); } return ProcessContinuation.resume(); @@ -558,24 +567,23 @@ public class Read { } @GetRestrictionCoder - public Coder<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> restrictionCoder() { - return KvCoder.of( + public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder() { + return new UnboundedSourceRestrictionCoder<>( SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointT>>() {}), - NullableCoder.of(restrictionCoder)); + NullableCoder.of(checkpointCoder)); } /** * A POJO representing all the values we need to pass between the {@link UnboundedReader} and * the {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement} method of the - * splittable DoFn. + * splittable DoFn for each output element. */ @AutoValue abstract static class UnboundedSourceValue<T> { - public static <T> UnboundedSourceValue<T> create( - byte[] id, T value, Instant timestamp, Instant watermark) { + public static <T> UnboundedSourceValue<T> create(byte[] id, T value, Instant timestamp) { return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceValue<T>( - id, value, timestamp, watermark); + id, value, timestamp); } @SuppressWarnings("mutable") @@ -584,10 +592,78 @@ public class Read { public abstract T getValue(); public abstract Instant getTimestamp(); + } + + /** + * A POJO representing all the state we need to maintain between the {@link UnboundedReader} and + * future {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement} calls. + */ + @AutoValue + abstract static class UnboundedSourceRestriction<OutputT, CheckpointT extends CheckpointMark> + implements Serializable { + public static <OutputT, CheckpointT extends CheckpointMark> + UnboundedSourceRestriction<OutputT, CheckpointT> create( + UnboundedSource<OutputT, CheckpointT> source, + CheckpointT checkpoint, + Instant watermark) { + return new AutoValue_Read_UnboundedSourceAsSDFWrapperFn_UnboundedSourceRestriction<>( + source, checkpoint, watermark); + } + + public abstract UnboundedSource<OutputT, CheckpointT> getSource(); + + @Nullable + public abstract CheckpointT getCheckpoint(); public abstract Instant getWatermark(); } + /** A {@link Coder} for {@link UnboundedSourceRestriction}s. */ + private static class UnboundedSourceRestrictionCoder< + OutputT, CheckpointT extends CheckpointMark> + extends StructuredCoder<UnboundedSourceRestriction<OutputT, CheckpointT>> { + + private final Coder<UnboundedSource<OutputT, CheckpointT>> sourceCoder; + private final Coder<CheckpointT> checkpointCoder; + + private UnboundedSourceRestrictionCoder( + Coder<UnboundedSource<OutputT, CheckpointT>> sourceCoder, + Coder<CheckpointT> checkpointCoder) { + this.sourceCoder = sourceCoder; + this.checkpointCoder = checkpointCoder; + } + + @Override + public void encode( + UnboundedSourceRestriction<OutputT, CheckpointT> value, OutputStream outStream) + throws CoderException, IOException { + sourceCoder.encode(value.getSource(), outStream); + checkpointCoder.encode(value.getCheckpoint(), outStream); + InstantCoder.of().encode(value.getWatermark(), outStream); + } + + @Override + public UnboundedSourceRestriction<OutputT, CheckpointT> decode(InputStream inStream) + throws CoderException, IOException { + return UnboundedSourceRestriction.create( + sourceCoder.decode(inStream), + checkpointCoder.decode(inStream), + InstantCoder.of().decode(inStream)); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.asList(sourceCoder, checkpointCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic(sourceCoder, "source coder not deterministic"); + verifyDeterministic(checkpointCoder, "checkpoint coder not deterministic"); + verifyDeterministic(InstantCoder.of(), "watermark coder not deterministic"); + } + } + /** * A marker implementation that is used to represent the primary "source" when performing a * split. The methods on this object are not meant to be called and only exist to fulfill the @@ -685,15 +761,15 @@ public class Read { private static class UnboundedSourceAsSDFRestrictionTracker< OutputT, CheckpointT extends CheckpointMark> extends RestrictionTracker< - KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>, UnboundedSourceValue<OutputT>[]> + UnboundedSourceRestriction<OutputT, CheckpointT>, UnboundedSourceValue<OutputT>[]> implements HasProgress { - private final KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction; + private final UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction; private final PipelineOptions pipelineOptions; private UnboundedSource.UnboundedReader<OutputT> currentReader; private boolean readerHasBeenStarted; UnboundedSourceAsSDFRestrictionTracker( - KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> initialRestriction, + UnboundedSourceRestriction<OutputT, CheckpointT> initialRestriction, PipelineOptions pipelineOptions) { this.initialRestriction = initialRestriction; this.pipelineOptions = pipelineOptions; @@ -705,8 +781,8 @@ public class Read { if (currentReader == null) { currentReader = initialRestriction - .getKey() - .createReader(pipelineOptions, initialRestriction.getValue()); + .getSource() + .createReader(pipelineOptions, initialRestriction.getCheckpoint()); } if (!readerHasBeenStarted) { readerHasBeenStarted = true; @@ -720,8 +796,7 @@ public class Read { UnboundedSourceValue.create( currentReader.getCurrentRecordId(), currentReader.getCurrent(), - currentReader.getCurrentTimestamp(), - currentReader.getWatermark()); + currentReader.getCurrentTimestamp()); return true; } catch (IOException e) { if (currentReader != null) { @@ -748,29 +823,32 @@ public class Read { /** The value is invalid if {@link #tryClaim} has ever thrown an exception. */ @Override - public KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction() { + public UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction() { if (currentReader == null) { return initialRestriction; } - return KV.of( + return UnboundedSourceRestriction.create( (UnboundedSource<OutputT, CheckpointT>) currentReader.getCurrentSource(), - (CheckpointT) currentReader.getCheckpointMark()); + (CheckpointT) currentReader.getCheckpointMark(), + currentReader.getWatermark()); } @Override - public SplitResult<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> trySplit( + public SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> trySplit( double fractionOfRemainder) { // Don't split if we have claimed all since the SDF wrapper will be finishing soon. // Our split result sets the primary to have no checkpoint mark associated // with it since when we resume we don't have any state but we specifically pass // the checkpoint mark to the current reader so that when we finish the current bundle // we may register for finalization. - KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT> currentRestriction = - currentRestriction(); - SplitResult<KV<UnboundedSource<OutputT, CheckpointT>, CheckpointT>> result = - SplitResult.of(KV.of(EmptyUnboundedSource.INSTANCE, null), currentRestriction); + UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction = currentRestriction(); + SplitResult<UnboundedSourceRestriction<OutputT, CheckpointT>> result = + SplitResult.of( + UnboundedSourceRestriction.create( + EmptyUnboundedSource.INSTANCE, null, currentRestriction.getWatermark()), + currentRestriction); currentReader = - EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getValue()); + EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint()); return result; } @@ -785,7 +863,7 @@ public class Read { @Override public Progress getProgress() { // We treat the empty source as implicitly done. - if (currentRestriction().getKey() instanceof EmptyUnboundedSource) { + if (currentRestriction().getSource() instanceof EmptyUnboundedSource) { return RestrictionTracker.Progress.from(1, 0); } @@ -793,8 +871,8 @@ public class Read { try { currentReader = initialRestriction - .getKey() - .createReader(pipelineOptions, initialRestriction.getValue()); + .getSource() + .createReader(pipelineOptions, initialRestriction.getCheckpoint()); } catch (IOException e) { throw new RuntimeException(e); }