This is an automated email from the ASF dual-hosted git repository. robinyqiu pushed a commit to branch release-2.25.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.25.0 by this push: new b0ead6e Merge pull request #13041 from lukecwik/beam10670.6 new c7ae63a Merge pull request #13044 from lukecwik/beam10670.6 b0ead6e is described below commit b0ead6ea1e913fda04b8df93b65d40f651b7f9de Author: Lukasz Cwik <lukec...@gmail.com> AuthorDate: Wed Oct 7 13:49:34 2020 -0700 Merge pull request #13041 from lukecwik/beam10670.6 [BEAM-10670][BEAM-11028][BEAM-10997] Ensure that UnboundedSourceAsSDFWrapperFn returns stop() when the UnboundedSource gets to BoundedWindow.TIMESTAMP_MAX_VALUE. Also close readers when they are done. --- .../src/main/java/org/apache/beam/sdk/io/Read.java | 30 +++++++++++++++++----- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 992e3dd..f8c7151 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -532,8 +532,7 @@ public class Read { tracker.currentRestriction(); // Advance the watermark even if zero elements may have been output. - watermarkEstimator.setWatermark( - ensureTimestampWithinBounds(currentRestriction.getWatermark())); + watermarkEstimator.setWatermark(currentRestriction.getWatermark()); // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial and is not // the initial restriction. The initial restriction would have been finalized as part of @@ -562,7 +561,7 @@ public class Read { return currentElementTimestamp; } - private Instant ensureTimestampWithinBounds(Instant timestamp) { + private static Instant ensureTimestampWithinBounds(Instant timestamp) { if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) { timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { @@ -842,10 +841,23 @@ public class Read { if (currentReader == null) { return initialRestriction; } + Instant watermark = ensureTimestampWithinBounds(currentReader.getWatermark()); + // We convert the reader to the empty reader to mark that we are done. + if (!(currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) + && BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) { + CheckpointT checkpointT = (CheckpointT) currentReader.getCheckpointMark(); + try { + currentReader.close(); + } catch (IOException e) { + LOG.warn("Failed to close UnboundedReader.", e); + } finally { + currentReader = EmptyUnboundedSource.INSTANCE.createReader(null, checkpointT); + } + } return UnboundedSourceRestriction.create( (UnboundedSource<OutputT, CheckpointT>) currentReader.getCurrentSource(), (CheckpointT) currentReader.getCheckpointMark(), - currentReader.getWatermark()); + watermark); } @Override @@ -866,8 +878,14 @@ public class Read { UnboundedSourceRestriction.create( EmptyUnboundedSource.INSTANCE, null, BoundedWindow.TIMESTAMP_MAX_VALUE), currentRestriction); - currentReader = - EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint()); + try { + currentReader.close(); + } catch (IOException e) { + LOG.warn("Failed to close UnboundedReader.", e); + } finally { + currentReader = + EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint()); + } return result; }