lukecwik commented on a change in pull request #11607: URL: https://github.com/apache/beam/pull/11607#discussion_r424830228
########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ########## @@ -537,6 +538,12 @@ public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentEleme @NewWatermarkEstimator public WatermarkEstimators.Manual newWatermarkEstimator( @WatermarkEstimatorState Instant watermarkEstimatorState) { + // Making sure that the watermark is within bounds. + if (watermarkEstimatorState.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) { + watermarkEstimatorState = BoundedWindow.TIMESTAMP_MIN_VALUE; + } else if (watermarkEstimatorState.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + watermarkEstimatorState = BoundedWindow.TIMESTAMP_MAX_VALUE; + } Review comment: I was thinking that this logic would be where we call setWatermark above on line 510 Note that the initial watermark estimate state is current element timestamp which is always between MIN and MAX timestamp values. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org