This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 47df6aefe88 [Spanner Change Streams] Fix potential data loss issue by 
ensuring to only claim timestamps that have been fully processed from the 
restriction tracker. (#37326)
47df6aefe88 is described below

commit 47df6aefe88cc555ace238b07d90f6ddedc60d38
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Jan 28 12:07:18 2026 +0100

    [Spanner Change Streams] Fix potential data loss issue by ensuring to only 
claim timestamps that have been fully processed from the restriction tracker. 
(#37326)
---
 .../action/QueryChangeStreamAction.java            |  88 ++++--
 .../action/QueryChangeStreamActionTest.java        | 312 ++++++++++++++++++++-
 2 files changed, 364 insertions(+), 36 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 3176abd9f24..8da9f3d0951 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -168,7 +168,6 @@ public class QueryChangeStreamAction {
    * @return a {@link ProcessContinuation#stop()} if a record timestamp could 
not be claimed or if
    *     the partition processing has finished
    */
-  @SuppressWarnings("nullness")
   @VisibleForTesting
   public ProcessContinuation run(
       PartitionMetadata partition,
@@ -177,12 +176,6 @@ public class QueryChangeStreamAction {
       ManualWatermarkEstimator<Instant> watermarkEstimator,
       BundleFinalizer bundleFinalizer) {
     final String token = partition.getPartitionToken();
-    final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
-    final Timestamp endTimestamp = partition.getEndTimestamp();
-    final Timestamp changeStreamQueryEndTimestamp =
-        endTimestamp.equals(MAX_INCLUSIVE_END_AT)
-            ? getNextReadChangeStreamEndTimestamp()
-            : endTimestamp;
 
     // TODO: Potentially we can avoid this fetch, by enriching the runningAt 
timestamp when the
     // ReadChangeStreamPartitionDoFn#processElement is called
@@ -198,6 +191,17 @@ public class QueryChangeStreamAction {
     RestrictionInterrupter<Timestamp> interrupter =
         RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);
 
+    final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
+    final Timestamp endTimestamp = partition.getEndTimestamp();
+    final boolean isBoundedRestriction = 
!endTimestamp.equals(MAX_INCLUSIVE_END_AT);
+    final Timestamp changeStreamQueryEndTimestamp =
+        isBoundedRestriction ? endTimestamp : 
getNextReadChangeStreamEndTimestamp();
+
+    // Once the changeStreamQuery completes we may need to resume reading from 
the partition if we
+    // had an unbounded restriction for which we set an arbitrary query end 
timestamp and for which
+    // we didn't  encounter any indications that the partition is done 
(explicit end records or
+    // exceptions about being out of timestamp range).
+    boolean stopAfterQuerySucceeds = isBoundedRestriction;
     try (ChangeStreamResultSet resultSet =
         changeStreamDao.changeStreamQuery(
             token, startTimestamp, changeStreamQueryEndTimestamp, 
partition.getHeartbeatMillis())) {
@@ -234,6 +238,10 @@ public class QueryChangeStreamAction {
                     tracker,
                     interrupter,
                     watermarkEstimator);
+            // Child Partition records indicate that the partition has ended. 
There may be
+            // additional ChildPartitionRecords but they will share the same 
timestamp and
+            // will be returned by the query and processed if it finishes 
successfully.
+            stopAfterQuerySucceeds = true;
           } else if (record instanceof PartitionStartRecord) {
             maybeContinuation =
                 partitionStartRecordAction.run(
@@ -250,6 +258,9 @@ public class QueryChangeStreamAction {
                     tracker,
                     interrupter,
                     watermarkEstimator);
+            // The PartitionEndRecord indicates that there are no more records 
expected
+            // for this partition.
+            stopAfterQuerySucceeds = true;
           } else if (record instanceof PartitionEventRecord) {
             maybeContinuation =
                 partitionEventRecordAction.run(
@@ -272,10 +283,6 @@ public class QueryChangeStreamAction {
           }
         }
       }
-      bundleFinalizer.afterBundleCommit(
-          Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
-          updateWatermarkCallback(token, watermarkEstimator));
-
     } catch (SpannerException e) {
       /*
       If there is a split when a partition is supposed to be finished, the 
residual will try
@@ -283,16 +290,16 @@ public class QueryChangeStreamAction {
       here, and the residual should be able to claim the end of the timestamp 
range, finishing
       the partition.
       */
-      if (isTimestampOutOfRange(e)) {
-        LOG.info(
-            "[{}] query change stream is out of range for {} to {}, finishing 
stream.",
-            token,
-            startTimestamp,
-            endTimestamp,
-            e);
-      } else {
+      if (!isTimestampOutOfRange(e)) {
         throw e;
       }
+      LOG.info(
+          "[{}] query change stream is out of range for {} to {}, finishing 
stream.",
+          token,
+          startTimestamp,
+          endTimestamp,
+          e);
+      stopAfterQuerySucceeds = true;
     } catch (Exception e) {
       LOG.error(
           "[{}] query change stream had exception processing range {} to {}.",
@@ -303,13 +310,40 @@ public class QueryChangeStreamAction {
       throw e;
     }
 
-    LOG.debug("[{}] change stream completed successfully", token);
-    if (tracker.tryClaim(endTimestamp)) {
-      LOG.debug("[{}] Finishing partition", token);
-      partitionMetadataDao.updateToFinished(token);
-      metrics.decActivePartitionReadCounter();
-      LOG.info("[{}] After attempting to finish the partition", token);
+    LOG.debug(
+        "[{}] change stream completed successfully up to {}", token, 
changeStreamQueryEndTimestamp);
+
+    if (!stopAfterQuerySucceeds) {
+      // Records stopped being returned for the query due to our artificial 
query end timestamp but
+      // we want to continue processing the partition, resuming from 
changeStreamQueryEndTimestamp.
+      if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {
+        return ProcessContinuation.stop();
+      }
+      bundleFinalizer.afterBundleCommit(
+          Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
+          updateWatermarkCallback(token, watermarkEstimator));
+      LOG.debug("[{}] Rescheduling partition to resume reading", token);
+      return ProcessContinuation.resume();
     }
+
+    // Otherwise we have finished processing the partition, either due to:
+    //   1. reading to the bounded restriction end timestamp
+    //   2. encountering a ChildPartitionRecord or EndPartitionRecord 
indicating there are no more
+    //      elements in the partition
+    //   3. encountering a exception indicating the start timestamp is out of 
bounds of the
+    //      partition
+    // We claim the restriction completely to satisfy internal sanity checks 
and do not reschedule
+    // the restriction.
+    if (!tracker.tryClaim(endTimestamp)) {
+      return ProcessContinuation.stop();
+    }
+
+    LOG.debug("[{}] Finishing partition", token);
+    // TODO: This should be performed after the commit succeeds.  Since bundle 
finalizers are not
+    // guaranteed to be called, this needs to be performed in a subsequent 
fused stage.
+    partitionMetadataDao.updateToFinished(token);
+    metrics.decActivePartitionReadCounter();
+    LOG.info("[{}] After attempting to finish the partition", token);
     return ProcessContinuation.stop();
   }
 
@@ -339,8 +373,8 @@ public class QueryChangeStreamAction {
   }
 
   // Return (now + 2 mins) as the end timestamp for reading change streams. 
This is only used if
-  // users want to run the connector forever. This approach works because 
Google Dataflow
-  // checkpoints every 5s or 5MB output provided and the change stream query 
has deadline for 1 min.
+  // users want to run the connector forever. If the end timestamp is reached, 
we will resume
+  // processing from that timestamp on a subsequent DoFn execution.
   private Timestamp getNextReadChangeStreamEndTimestamp() {
     final Timestamp current = Timestamp.now();
     return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, 
current.getNanos());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 21f5a888b14..cf4c047025c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -27,6 +29,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.SpannerExceptionFactory;
 import com.google.cloud.spanner.Struct;
 import java.util.Arrays;
 import java.util.Optional;
@@ -55,10 +59,12 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 public class QueryChangeStreamActionTest {
   private static final String PARTITION_TOKEN = "partitionToken";
   private static final Timestamp PARTITION_START_TIMESTAMP = 
Timestamp.ofTimeMicroseconds(10L);
+  private static final Timestamp RECORD_TIMESTAMP = 
Timestamp.ofTimeMicroseconds(20L);
   private static final Timestamp PARTITION_END_TIMESTAMP = 
Timestamp.ofTimeMicroseconds(30L);
   private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L;
   private static final Instant WATERMARK = Instant.now();
@@ -136,6 +142,22 @@ public class QueryChangeStreamActionTest {
     when(partitionMetadataMapper.from(row)).thenReturn(partition);
   }
 
+  void setupUnboundedPartition() {
+    partition =
+        PartitionMetadata.newBuilder()
+            .setPartitionToken(PARTITION_TOKEN)
+            .setParentTokens(Sets.newHashSet("parentToken"))
+            .setStartTimestamp(PARTITION_START_TIMESTAMP)
+            .setEndTimestamp(MAX_INCLUSIVE_END_AT)
+            .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
+            .setState(SCHEDULED)
+            .setWatermark(WATERMARK_TIMESTAMP)
+            .setScheduledAt(Timestamp.now())
+            .build();
+    when(partitionMetadataMapper.from(any())).thenReturn(partition);
+    when(restriction.getTo()).thenReturn(MAX_INCLUSIVE_END_AT);
+  }
+
   @Test
   public void testQueryChangeStreamWithDataChangeRecord() {
     final Struct rowAsStruct = mock(Struct.class);
@@ -145,7 +167,7 @@ public class QueryChangeStreamActionTest {
     final DataChangeRecord record1 = mock(DataChangeRecord.class);
     final DataChangeRecord record2 = mock(DataChangeRecord.class);
     when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
-    when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+    when(record2.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
     when(changeStreamDao.changeStreamQuery(
             PARTITION_TOKEN,
             PARTITION_START_TIMESTAMP,
@@ -214,8 +236,8 @@ public class QueryChangeStreamActionTest {
     final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
     final HeartbeatRecord record1 = mock(HeartbeatRecord.class);
     final HeartbeatRecord record2 = mock(HeartbeatRecord.class);
-    when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
-    when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+    when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+    when(record2.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
     when(changeStreamDao.changeStreamQuery(
             PARTITION_TOKEN,
             PARTITION_START_TIMESTAMP,
@@ -498,19 +520,19 @@ public class QueryChangeStreamActionTest {
   }
 
   @Test
-  public void testQueryChangeStreamWithPartitionEndRecord() {
+  public void testQueryChangeStreamWithPartitionEndRecordBoundedRestriction() {
     final ChangeStreamResultSetMetadata resultSetMetadata =
         mock(ChangeStreamResultSetMetadata.class);
     final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
     final PartitionEndRecord record1 = mock(PartitionEndRecord.class);
-    when(record1.getRecordTimestamp()).thenReturn(PARTITION_END_TIMESTAMP);
+    when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
     when(changeStreamDao.changeStreamQuery(
             PARTITION_TOKEN,
             PARTITION_START_TIMESTAMP,
             PARTITION_END_TIMESTAMP,
             PARTITION_HEARTBEAT_MILLIS))
         .thenReturn(resultSet);
-    when(resultSet.next()).thenReturn(true);
+    when(resultSet.next()).thenReturn(true, false);
     when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
     when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata))
         .thenReturn(Arrays.asList(record1));
@@ -520,8 +542,9 @@ public class QueryChangeStreamActionTest {
             eq(restrictionTracker),
             any(RestrictionInterrupter.class),
             eq(watermarkEstimator)))
-        .thenReturn(Optional.of(ProcessContinuation.stop()));
+        .thenReturn(Optional.empty());
     when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
 
     final ProcessContinuation result =
         action.run(
@@ -535,14 +558,67 @@ public class QueryChangeStreamActionTest {
             eq(restrictionTracker),
             any(RestrictionInterrupter.class),
             eq(watermarkEstimator));
-    verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
+    verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
-    verify(restrictionTracker, never()).tryClaim(any());
+    verify(partitionMetadataDao, never()).updateWatermark(any(), any());
+  }
+
+  @Test
+  public void 
testQueryChangeStreamWithPartitionEndRecordUnboundedRestriction() {
+    setupUnboundedPartition();
+
+    final ChangeStreamResultSetMetadata resultSetMetadata =
+        mock(ChangeStreamResultSetMetadata.class);
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final PartitionEndRecord record1 = mock(PartitionEndRecord.class);
+    when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+    final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN),
+            eq(PARTITION_START_TIMESTAMP),
+            timestampCaptor.capture(),
+            eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true, false);
+    when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+    when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata))
+        .thenReturn(Arrays.asList(record1));
+    when(partitionEndRecordAction.run(
+            eq(partition),
+            eq(record1),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator)))
+        .thenReturn(Optional.empty());
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
+    verify(partitionEndRecordAction)
+        .run(
+            eq(partition),
+            eq(record1),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator));
+    verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionMetadataDao, never()).updateWatermark(any(), any());
   }
 
   @Test
@@ -611,8 +687,90 @@ public class QueryChangeStreamActionTest {
             partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
 
     assertEquals(ProcessContinuation.stop(), result);
+    verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
+    verify(metrics).decActivePartitionReadCounter();
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
+  }
+
+  @Test
+  public void testQueryChangeStreamFinishedWithResume() {
+    partition =
+        PartitionMetadata.newBuilder()
+            .setPartitionToken(PARTITION_TOKEN)
+            .setParentTokens(Sets.newHashSet("parentToken"))
+            .setStartTimestamp(PARTITION_START_TIMESTAMP)
+            .setEndTimestamp(MAX_INCLUSIVE_END_AT)
+            .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
+            .setState(SCHEDULED)
+            .setWatermark(WATERMARK_TIMESTAMP)
+            .setScheduledAt(Timestamp.now())
+            .build();
+    when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+    final ChangeStreamResultSet changeStreamResultSet = 
mock(ChangeStreamResultSet.class);
+    final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN),
+            eq(PARTITION_START_TIMESTAMP),
+            timestampCaptor.capture(),
+            eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenReturn(changeStreamResultSet);
+    when(changeStreamResultSet.next()).thenReturn(false);
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+    assertEquals(ProcessContinuation.resume(), result);
+    assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
+
+    verify(restrictionTracker).tryClaim(timestampCaptor.getValue());
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
+    verify(partitionMetadataDao, never()).updateToFinished(PARTITION_TOKEN);
+    verify(metrics, never()).decActivePartitionReadCounter();
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
+  }
+
+  // Out of range indicates that we're beyond the end of the partition and 
should stop
+  // processing.
+  @Test
+  public void testQueryChangeStreamWithOutOfRangeErrorOnUnboundedPartition() {
+    setupUnboundedPartition();
+
+    final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN),
+            eq(PARTITION_START_TIMESTAMP),
+            timestampCaptor.capture(),
+            eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenThrow(
+            SpannerExceptionFactory.newSpannerException(
+                ErrorCode.OUT_OF_RANGE, "Specified start_timestamp is 
invalid"));
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+    assertEquals(ProcessContinuation.stop(), result);
+    assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
+
+    verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
     verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
+    verify(metrics).decActivePartitionReadCounter();
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
@@ -620,6 +778,142 @@ public class QueryChangeStreamActionTest {
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionMetadataDao, never()).updateWatermark(any(), any());
+  }
+
+  // Out of range indicates that we're beyond the end of the partition and 
should stop
+  // processing.
+  @Test
+  public void testQueryChangeStreamWithOutOfRangeErrorOnBoundedPartition() {
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN),
+            eq(PARTITION_START_TIMESTAMP),
+            eq(PARTITION_END_TIMESTAMP),
+            eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenThrow(
+            SpannerExceptionFactory.newSpannerException(
+                ErrorCode.OUT_OF_RANGE, "Specified start_timestamp is 
invalid"));
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+    assertEquals(ProcessContinuation.stop(), result);
+
+    verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
+    verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
+    verify(metrics).decActivePartitionReadCounter();
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionMetadataDao, never()).updateWatermark(any(), any());
+  }
+
+  @Test
+  public void 
testQueryChangeStreamWithChildPartitionsRecordBoundedRestriction() {
+    final ChangeStreamResultSetMetadata resultSetMetadata =
+        mock(ChangeStreamResultSetMetadata.class);
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
+    when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+    when(changeStreamDao.changeStreamQuery(
+            PARTITION_TOKEN,
+            PARTITION_START_TIMESTAMP,
+            PARTITION_END_TIMESTAMP,
+            PARTITION_HEARTBEAT_MILLIS))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true, false);
+    when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+    when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata))
+        .thenReturn(Arrays.asList(record1));
+    when(childPartitionsRecordAction.run(
+            eq(partition),
+            eq(record1),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator)))
+        .thenReturn(Optional.empty());
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    verify(childPartitionsRecordAction)
+        .run(
+            eq(partition),
+            eq(record1),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator));
+    verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionMetadataDao, never()).updateWatermark(any(), any());
+  }
+
+  @Test
+  public void 
testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction() {
+    setupUnboundedPartition();
+
+    final ChangeStreamResultSetMetadata resultSetMetadata =
+        mock(ChangeStreamResultSetMetadata.class);
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
+    when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+    final ArgumentCaptor<Timestamp> timestampCaptor = 
ArgumentCaptor.forClass(Timestamp.class);
+    when(changeStreamDao.changeStreamQuery(
+            eq(PARTITION_TOKEN),
+            eq(PARTITION_START_TIMESTAMP),
+            timestampCaptor.capture(),
+            eq(PARTITION_HEARTBEAT_MILLIS)))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true, false);
+    when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+    when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata))
+        .thenReturn(Arrays.asList(record1));
+    when(childPartitionsRecordAction.run(
+            eq(partition),
+            eq(record1),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator)))
+        .thenReturn(Optional.empty());
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
+    verify(childPartitionsRecordAction)
+        .run(
+            eq(partition),
+            eq(record1),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator));
+    verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionMetadataDao, never()).updateWatermark(any(), any());
   }
 
   private static class BundleFinalizerStub implements BundleFinalizer {

Reply via email to