chenxuesdu commented on code in PR #37326:
URL: https://github.com/apache/beam/pull/37326#discussion_r2709725489
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -179,10 +178,9 @@ public ProcessContinuation run(
final String token = partition.getPartitionToken();
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
final Timestamp endTimestamp = partition.getEndTimestamp();
+ final boolean readToEndTimestamp =
!endTimestamp.equals(MAX_INCLUSIVE_END_AT);
final Timestamp changeStreamQueryEndTimestamp =
Review Comment:
Since the changeStreamQueryEndTimestamp is only used in line 202, can we
move it right before line 202 so we can avoid some cases that the
changeStreamQueryEndTimestamp is set in the past if we meant to set it in the
future.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -179,10 +178,9 @@ public ProcessContinuation run(
final String token = partition.getPartitionToken();
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
final Timestamp endTimestamp = partition.getEndTimestamp();
+ final boolean readToEndTimestamp =
!endTimestamp.equals(MAX_INCLUSIVE_END_AT);
Review Comment:
How about renaming to isBoundedQuery?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -250,6 +249,9 @@ public ProcessContinuation run(
tracker,
interrupter,
watermarkEstimator);
+ // The PartitionEndRecord indicates that there are no more records
expected
+ // for this partition.
+ stopAfterQuerySucceeds = true;
Review Comment:
Seems like we don't need it here. Only change stream v2 has
PartitionEndRecord, while change stream v1 has ChildPartitionRecord (might be
more than one), which also indicate the partition ends.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,13 +301,28 @@ public ProcessContinuation run(
throw e;
}
- LOG.debug("[{}] change stream completed successfully", token);
- if (tracker.tryClaim(endTimestamp)) {
- LOG.debug("[{}] Finishing partition", token);
- partitionMetadataDao.updateToFinished(token);
- metrics.decActivePartitionReadCounter();
- LOG.info("[{}] After attempting to finish the partition", token);
+ LOG.debug(
+ "[{}] change stream completed successfully up to {}", token,
changeStreamQueryEndTimestamp);
+ Timestamp claimTimestamp =
+ stopAfterQuerySucceeds ? endTimestamp : changeStreamQueryEndTimestamp;
+ if (!tracker.tryClaim(claimTimestamp)) {
+ return ProcessContinuation.stop();
+ }
Review Comment:
Could you please verify if my understand is correct. Thanks
If a query finished without any interruptions, the tryClaim will always
return true in line 308 no matter for bounded or unbounded query.
For bounded query line 315 return false and it is marked as finished later.
For unbounded query, line 315 return false, the work get rescheduled, the
new query got out of range error and stopAfterQuerySucceeds is set to true and
later the partition is marked as finished.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]