lukecwik commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r424830228
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +538,12 @@ public Instant
getInitialWatermarkEstimatorState(@Timestamp Instant currentEleme
@NewWatermarkEstimator
public WatermarkEstimators.Manual newWatermarkEstimator(
@WatermarkEstimatorState Instant watermarkEstimatorState) {
+ // Making sure that the watermark is within bounds.
+ if (watermarkEstimatorState.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE))
{
+ watermarkEstimatorState = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ } else if
(watermarkEstimatorState.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+ watermarkEstimatorState = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
Review comment:
I was thinking that this logic would be where we call setWatermark above
on line 510
Note that the initial watermark estimate state is current element timestamp
which is always between MIN and MAX timestamp values.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]