scwhittle commented on code in PR #37580:
URL: https://github.com/apache/beam/pull/37580#discussion_r2822299907


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java:
##########
@@ -49,6 +49,15 @@ public class ChangeStreamsConstants {
    */
   public static final Timestamp DEFAULT_INCLUSIVE_END_AT = 
MAX_INCLUSIVE_END_AT;
 
+  public static final Duration DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL = 
Duration.standardMinutes(2);
+
+  public static final int DEFAULT_HEARTBEAT_MILLIS = 2000;
+
+  public static final Duration 
DEFAULT_LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL =
+      Duration.standardSeconds(1);
+
+  public static final int DEFAULT_LOW_LATENCY_HEARTBEAT_MILLIS = 100;

Review Comment:
   remove this for now if we're not aborting on receiving a heartbeat



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java:
##########
@@ -119,7 +120,8 @@ public void setUp() throws Exception {
             partitionEndRecordAction,
             partitionEventRecordAction,
             metrics,
-            false);
+            false,
+            org.joda.time.Duration.standardMinutes(2));

Review Comment:
   just use Duration



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -387,12 +391,15 @@ private boolean isTimestampOutOfRange(SpannerException e) 
{
         && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
   }
 
-  // Return (now + 2 mins) as the end timestamp for reading change streams. 
This is only used if
-  // users want to run the connector forever. If the end timestamp is reached, 
we will resume
-  // processing from that timestamp on a subsequent DoFn execution.
+  // Return (now + config duration) as the end timestamp for reading change 
streams. This is only
+  // used if  users want to run the connector forever. If the end timestamp is 
reached, we
+  // will resume processing from that timestamp on a subsequent DoFn execution.
   private Timestamp getNextReadChangeStreamEndTimestamp() {
     final Timestamp current = Timestamp.now();
-    return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, 
current.getNanos());
+    long seconds = current.getSeconds() + 
realTimeCheckpointInterval.getStandardSeconds();
+    int nanos =
+        current.getNanos() + (int) ((realTimeCheckpointInterval.getMillis() % 
1000) * 1_000_000);

Review Comment:
   I think this could fail validation if it ends up being over a second worth 
of nanos. We don't need nanos precision here so how about doing this?
   
   
`Timestamp.ofTimeMicroseconds(Instant.now().plus(realTimeCheckpointInterval).getMillis()*1000);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to