scwhittle commented on code in PR #37326:
URL: https://github.com/apache/beam/pull/37326#discussion_r2710035751
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,13 +301,28 @@ public ProcessContinuation run(
throw e;
}
- LOG.debug("[{}] change stream completed successfully", token);
- if (tracker.tryClaim(endTimestamp)) {
- LOG.debug("[{}] Finishing partition", token);
- partitionMetadataDao.updateToFinished(token);
- metrics.decActivePartitionReadCounter();
- LOG.info("[{}] After attempting to finish the partition", token);
+ LOG.debug(
+ "[{}] change stream completed successfully up to {}", token,
changeStreamQueryEndTimestamp);
+ Timestamp claimTimestamp =
+ stopAfterQuerySucceeds ? endTimestamp : changeStreamQueryEndTimestamp;
+ if (!tracker.tryClaim(claimTimestamp)) {
+ return ProcessContinuation.stop();
+ }
Review Comment:
> If a query finished without any interruptions, the tryClaim will always
return true in line 308 no matter for bounded or unbounded query.
It is possible that the runner splits the processing during the completion
of the query and this tryClaim call, that is the case that tryClaim here would
return false. Otherwise this marks at what point we have processed this
partition up to either the end of the range or the end of what we queried. one
note of interest is that if we are going stop processing due to internal SDF
checks we need to claim up to the endTimestamp as otherwise it considers it
data loss.
> For bounded query line 315 return false and it is marked as finished later.
For unbounded query, line 315 return false, the work get rescheduled, the
new query got out of range error and stopAfterQuerySucceeds is set to true and
later the partition is marked as finished.
stopAfterQuerySucceeds is true either due to
1. being a bounded partition (which we queried to completion).
2. we got record indicating that the partition ended (I was using
PartitionEndRecord but sounds like maybe that isn't enough for v1 change
streams).
3. we got a out-of-range error indicating the partition ended
If we don't want to stop ie stopAfterQuerySucceed=false, we indicate the SDF
should resume with return value and above we claimed only to
changeStreamQueryEndTimestamp. The SDF will resume from
changeStreamQueryEndTimestamp as the start timestamp.
If we want to stop, we indicate the SDF should stop with the return value
and we claimed up to endTimestamp above. The partition will not be rescheduled.
A difficulty that this code is trying to solve is that I don't see an easy
way to determine when a query initiated from an unbounded range completes
whether it completed due to
1. reaching the changeStreamQueryEndTimestamp (in which case we want to
resume)
2. it completed due to the partition being done
This is why I was trying to explicitly track the partition done conditions
via the stopAfterQuerySucceeds boolean.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -250,6 +249,9 @@ public ProcessContinuation run(
tracker,
interrupter,
watermarkEstimator);
+ // The PartitionEndRecord indicates that there are no more records
expected
+ // for this partition.
+ stopAfterQuerySucceeds = true;
Review Comment:
See below, I need the variable to determine whether the query made from
(start, now+2m) stopped due to reaching now+2m or due to being done. Since if
it's just because it reached the artificial end timestamp we need to reschedule.
For v1, it sounds like this approach is not sufficient. I believe the
change to the artificial end timestamp was made for v2 change streams. Is it
possible to see whether v1 or v2 is being used and we can use
changeStreamQueryEndTimestamp=endTimestamp for v1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]