chamikaramj commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r425283694



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
##########
@@ -44,4 +45,19 @@
    * <p>The state returned must not be mutated.
    */
   WatermarkEstimatorStateT getState();
+
+  /**
+   * Validates that a given watermark is within timestamp min and max bounds.
+   *
+   * @param watermark watermark to validate
+   */
+  static void ensureWatermarkWithinBounds(Instant watermark) {

Review comment:
       Done.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +538,12 @@ public Instant 
getInitialWatermarkEstimatorState(@Timestamp Instant currentEleme
     @NewWatermarkEstimator
     public WatermarkEstimators.Manual newWatermarkEstimator(
         @WatermarkEstimatorState Instant watermarkEstimatorState) {
+      // Making sure that the watermark is within bounds.
+      if (watermarkEstimatorState.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) 
{
+        watermarkEstimatorState = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      } else if 
(watermarkEstimatorState.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+        watermarkEstimatorState = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      }

Review comment:
       Added to both.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to