lukecwik commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r424830228



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +538,12 @@ public Instant 
getInitialWatermarkEstimatorState(@Timestamp Instant currentEleme
     @NewWatermarkEstimator
     public WatermarkEstimators.Manual newWatermarkEstimator(
         @WatermarkEstimatorState Instant watermarkEstimatorState) {
+      // Making sure that the watermark is within bounds.
+      if (watermarkEstimatorState.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) 
{
+        watermarkEstimatorState = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      } else if 
(watermarkEstimatorState.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+        watermarkEstimatorState = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      }

Review comment:
       I was thinking that this logic would be where we call setWatermark above 
on line 510
   
   Note that the initial watermark estimate state is current element timestamp 
which is always between MIN and MAX timestamp values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to