scwhittle commented on code in PR #37326:
URL: https://github.com/apache/beam/pull/37326#discussion_r2710035751


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,13 +301,28 @@ public ProcessContinuation run(
       throw e;
     }
 
-    LOG.debug("[{}] change stream completed successfully", token);
-    if (tracker.tryClaim(endTimestamp)) {
-      LOG.debug("[{}] Finishing partition", token);
-      partitionMetadataDao.updateToFinished(token);
-      metrics.decActivePartitionReadCounter();
-      LOG.info("[{}] After attempting to finish the partition", token);
+    LOG.debug(
+        "[{}] change stream completed successfully up to {}", token, 
changeStreamQueryEndTimestamp);
+    Timestamp claimTimestamp =
+        stopAfterQuerySucceeds ? endTimestamp : changeStreamQueryEndTimestamp;
+    if (!tracker.tryClaim(claimTimestamp)) {
+      return ProcessContinuation.stop();
+    }

Review Comment:
   > If a query finished without any interruptions, the tryClaim will always 
return true in line 308 no matter for bounded or unbounded query.
   
   It is possible that the runner splits the processing during the completion 
of the query and this tryClaim call, that is the case that tryClaim here would 
return false.  Otherwise this marks at what point we have processed this 
partition up to either the end of the range or the end of what we queried.  one 
note of interest is that if we are going stop processing due to internal SDF 
checks we need to claim up to the endTimestamp as otherwise it considers it 
data loss.
   
   > For bounded query line 315 return false and it is marked as finished later.
   For unbounded query, line 315 return false, the work get rescheduled, the 
new query got out of range error and stopAfterQuerySucceeds is set to true and 
later the partition is marked as finished.
   
   stopAfterQuerySucceeds is true either due to 
   1. being a bounded partition (which we queried to completion). 
   2. we got record indicating that the partition ended (I was using 
PartitionEndRecord but sounds like maybe that isn't enough for v1 change 
streams).
   3. we got a out-of-range error indicating the partition ended
   
   If we don't want to stop ie stopAfterQuerySucceed=false, we indicate the SDF 
should resume with return value and above we claimed only to 
changeStreamQueryEndTimestamp. The SDF will resume from 
changeStreamQueryEndTimestamp as the start timestamp.
   
   If we want to stop, we indicate the SDF should stop with the return value 
and we claimed up to endTimestamp above.  The partition will not be rescheduled.
   
   A difficulty that this code is trying to solve is that I don't see an easy 
way to determine when a query initiated from an unbounded range completes 
whether it completed due to
   1. reaching the changeStreamQueryEndTimestamp (in which case we want to 
resume)
   2. it completed due to the partition being done
   
   This is why I was trying to explicitly track the partition done conditions 
via the stopAfterQuerySucceeds boolean.
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -250,6 +249,9 @@ public ProcessContinuation run(
                     tracker,
                     interrupter,
                     watermarkEstimator);
+            // The PartitionEndRecord indicates that there are no more records 
expected
+            // for this partition.
+            stopAfterQuerySucceeds = true;

Review Comment:
   See below, I need the variable to determine whether the query made from 
(start, now+2m) stopped due to reaching now+2m or due to being done.  Since if 
it's just because it reached the artificial end timestamp we need to reschedule.
   
   For v1, it sounds like this approach is not sufficient.  I believe the 
change to the artificial end timestamp was made for v2 change streams.  Is it 
possible to see whether v1 or v2 is being used and we can use 
changeStreamQueryEndTimestamp=endTimestamp for v1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to