tianz101 commented on code in PR #37326:
URL: https://github.com/apache/beam/pull/37326#discussion_r2710353214
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -179,10 +178,9 @@ public ProcessContinuation run(
final String token = partition.getPartitionToken();
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
final Timestamp endTimestamp = partition.getEndTimestamp();
+ final boolean readToEndTimestamp =
!endTimestamp.equals(MAX_INCLUSIVE_END_AT);
final Timestamp changeStreamQueryEndTimestamp =
- endTimestamp.equals(MAX_INCLUSIVE_END_AT)
- ? getNextReadChangeStreamEndTimestamp()
- : endTimestamp;
+ readToEndTimestamp ? endTimestamp :
getNextReadChangeStreamEndTimestamp();
Review Comment:
Here we have 2 possibilities: use the bounded partition end ts, or use the
now + 2m to query. We need to remember which ts we used in order to tryClaim
the right ts.
As I mentioned in the early comment, the bounded endTs can be used for V1
but not for V2.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]