This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch release-2.25.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.25.0 by this push:
     new b0ead6e  Merge pull request #13041 from lukecwik/beam10670.6
     new c7ae63a  Merge pull request #13044 from lukecwik/beam10670.6
b0ead6e is described below

commit b0ead6ea1e913fda04b8df93b65d40f651b7f9de
Author: Lukasz Cwik <lukec...@gmail.com>
AuthorDate: Wed Oct 7 13:49:34 2020 -0700

    Merge pull request #13041 from lukecwik/beam10670.6
    
    [BEAM-10670][BEAM-11028][BEAM-10997] Ensure that 
UnboundedSourceAsSDFWrapperFn returns stop() when the UnboundedSource gets to 
BoundedWindow.TIMESTAMP_MAX_VALUE. Also close readers when they are done.
---
 .../src/main/java/org/apache/beam/sdk/io/Read.java | 30 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 992e3dd..f8c7151 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -532,8 +532,7 @@ public class Read {
           tracker.currentRestriction();
 
       // Advance the watermark even if zero elements may have been output.
-      watermarkEstimator.setWatermark(
-          ensureTimestampWithinBounds(currentRestriction.getWatermark()));
+      watermarkEstimator.setWatermark(currentRestriction.getWatermark());
 
       // Add the checkpoint mark to be finalized if the checkpoint mark isn't 
trivial and is not
       // the initial restriction. The initial restriction would have been 
finalized as part of
@@ -562,7 +561,7 @@ public class Read {
       return currentElementTimestamp;
     }
 
-    private Instant ensureTimestampWithinBounds(Instant timestamp) {
+    private static Instant ensureTimestampWithinBounds(Instant timestamp) {
       if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
         timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
       } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
@@ -842,10 +841,23 @@ public class Read {
         if (currentReader == null) {
           return initialRestriction;
         }
+        Instant watermark = 
ensureTimestampWithinBounds(currentReader.getWatermark());
+        // We convert the reader to the empty reader to mark that we are done.
+        if (!(currentReader instanceof 
EmptyUnboundedSource.EmptyUnboundedReader)
+            && BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
+          CheckpointT checkpointT = (CheckpointT) 
currentReader.getCheckpointMark();
+          try {
+            currentReader.close();
+          } catch (IOException e) {
+            LOG.warn("Failed to close UnboundedReader.", e);
+          } finally {
+            currentReader = EmptyUnboundedSource.INSTANCE.createReader(null, 
checkpointT);
+          }
+        }
         return UnboundedSourceRestriction.create(
             (UnboundedSource<OutputT, CheckpointT>) 
currentReader.getCurrentSource(),
             (CheckpointT) currentReader.getCheckpointMark(),
-            currentReader.getWatermark());
+            watermark);
       }
 
       @Override
@@ -866,8 +878,14 @@ public class Read {
                 UnboundedSourceRestriction.create(
                     EmptyUnboundedSource.INSTANCE, null, 
BoundedWindow.TIMESTAMP_MAX_VALUE),
                 currentRestriction);
-        currentReader =
-            EmptyUnboundedSource.INSTANCE.createReader(null, 
currentRestriction.getCheckpoint());
+        try {
+          currentReader.close();
+        } catch (IOException e) {
+          LOG.warn("Failed to close UnboundedReader.", e);
+        } finally {
+          currentReader =
+              EmptyUnboundedSource.INSTANCE.createReader(null, 
currentRestriction.getCheckpoint());
+        }
         return result;
       }
 

Reply via email to