This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 47df6aefe88 [Spanner Change Streams] Fix potential data loss issue by
ensuring to only claim timestamps that have been fully processed from the
restriction tracker. (#37326)
47df6aefe88 is described below
commit 47df6aefe88cc555ace238b07d90f6ddedc60d38
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Jan 28 12:07:18 2026 +0100
[Spanner Change Streams] Fix potential data loss issue by ensuring to only
claim timestamps that have been fully processed from the restriction tracker.
(#37326)
---
.../action/QueryChangeStreamAction.java | 88 ++++--
.../action/QueryChangeStreamActionTest.java | 312 ++++++++++++++++++++-
2 files changed, 364 insertions(+), 36 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 3176abd9f24..8da9f3d0951 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -168,7 +168,6 @@ public class QueryChangeStreamAction {
* @return a {@link ProcessContinuation#stop()} if a record timestamp could
not be claimed or if
* the partition processing has finished
*/
- @SuppressWarnings("nullness")
@VisibleForTesting
public ProcessContinuation run(
PartitionMetadata partition,
@@ -177,12 +176,6 @@ public class QueryChangeStreamAction {
ManualWatermarkEstimator<Instant> watermarkEstimator,
BundleFinalizer bundleFinalizer) {
final String token = partition.getPartitionToken();
- final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
- final Timestamp endTimestamp = partition.getEndTimestamp();
- final Timestamp changeStreamQueryEndTimestamp =
- endTimestamp.equals(MAX_INCLUSIVE_END_AT)
- ? getNextReadChangeStreamEndTimestamp()
- : endTimestamp;
// TODO: Potentially we can avoid this fetch, by enriching the runningAt
timestamp when the
// ReadChangeStreamPartitionDoFn#processElement is called
@@ -198,6 +191,17 @@ public class QueryChangeStreamAction {
RestrictionInterrupter<Timestamp> interrupter =
RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);
+ final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
+ final Timestamp endTimestamp = partition.getEndTimestamp();
+ final boolean isBoundedRestriction =
!endTimestamp.equals(MAX_INCLUSIVE_END_AT);
+ final Timestamp changeStreamQueryEndTimestamp =
+ isBoundedRestriction ? endTimestamp :
getNextReadChangeStreamEndTimestamp();
+
+ // Once the changeStreamQuery completes we may need to resume reading from
the partition if we
+ // had an unbounded restriction for which we set an arbitrary query end
timestamp and for which
+ // we didn't encounter any indications that the partition is done
(explicit end records or
+ // exceptions about being out of timestamp range).
+ boolean stopAfterQuerySucceeds = isBoundedRestriction;
try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, changeStreamQueryEndTimestamp,
partition.getHeartbeatMillis())) {
@@ -234,6 +238,10 @@ public class QueryChangeStreamAction {
tracker,
interrupter,
watermarkEstimator);
+ // Child Partition records indicate that the partition has ended.
There may be
+ // additional ChildPartitionRecords but they will share the same
timestamp and
+ // will be returned by the query and processed if it finishes
successfully.
+ stopAfterQuerySucceeds = true;
} else if (record instanceof PartitionStartRecord) {
maybeContinuation =
partitionStartRecordAction.run(
@@ -250,6 +258,9 @@ public class QueryChangeStreamAction {
tracker,
interrupter,
watermarkEstimator);
+ // The PartitionEndRecord indicates that there are no more records
expected
+ // for this partition.
+ stopAfterQuerySucceeds = true;
} else if (record instanceof PartitionEventRecord) {
maybeContinuation =
partitionEventRecordAction.run(
@@ -272,10 +283,6 @@ public class QueryChangeStreamAction {
}
}
}
- bundleFinalizer.afterBundleCommit(
- Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
- updateWatermarkCallback(token, watermarkEstimator));
-
} catch (SpannerException e) {
/*
If there is a split when a partition is supposed to be finished, the
residual will try
@@ -283,16 +290,16 @@ public class QueryChangeStreamAction {
here, and the residual should be able to claim the end of the timestamp
range, finishing
the partition.
*/
- if (isTimestampOutOfRange(e)) {
- LOG.info(
- "[{}] query change stream is out of range for {} to {}, finishing
stream.",
- token,
- startTimestamp,
- endTimestamp,
- e);
- } else {
+ if (!isTimestampOutOfRange(e)) {
throw e;
}
+ LOG.info(
+ "[{}] query change stream is out of range for {} to {}, finishing
stream.",
+ token,
+ startTimestamp,
+ endTimestamp,
+ e);
+ stopAfterQuerySucceeds = true;
} catch (Exception e) {
LOG.error(
"[{}] query change stream had exception processing range {} to {}.",
@@ -303,13 +310,40 @@ public class QueryChangeStreamAction {
throw e;
}
- LOG.debug("[{}] change stream completed successfully", token);
- if (tracker.tryClaim(endTimestamp)) {
- LOG.debug("[{}] Finishing partition", token);
- partitionMetadataDao.updateToFinished(token);
- metrics.decActivePartitionReadCounter();
- LOG.info("[{}] After attempting to finish the partition", token);
+ LOG.debug(
+ "[{}] change stream completed successfully up to {}", token,
changeStreamQueryEndTimestamp);
+
+ if (!stopAfterQuerySucceeds) {
+ // Records stopped being returned for the query due to our artificial
query end timestamp but
+ // we want to continue processing the partition, resuming from
changeStreamQueryEndTimestamp.
+ if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {
+ return ProcessContinuation.stop();
+ }
+ bundleFinalizer.afterBundleCommit(
+ Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
+ updateWatermarkCallback(token, watermarkEstimator));
+ LOG.debug("[{}] Rescheduling partition to resume reading", token);
+ return ProcessContinuation.resume();
}
+
+ // Otherwise we have finished processing the partition, either due to:
+ // 1. reading to the bounded restriction end timestamp
+ // 2. encountering a ChildPartitionRecord or EndPartitionRecord
indicating there are no more
+ // elements in the partition
+ // 3. encountering a exception indicating the start timestamp is out of
bounds of the
+ // partition
+ // We claim the restriction completely to satisfy internal sanity checks
and do not reschedule
+ // the restriction.
+ if (!tracker.tryClaim(endTimestamp)) {
+ return ProcessContinuation.stop();
+ }
+
+ LOG.debug("[{}] Finishing partition", token);
+ // TODO: This should be performed after the commit succeeds. Since bundle
finalizers are not
+ // guaranteed to be called, this needs to be performed in a subsequent
fused stage.
+ partitionMetadataDao.updateToFinished(token);
+ metrics.decActivePartitionReadCounter();
+ LOG.info("[{}] After attempting to finish the partition", token);
return ProcessContinuation.stop();
}
@@ -339,8 +373,8 @@ public class QueryChangeStreamAction {
}
// Return (now + 2 mins) as the end timestamp for reading change streams.
This is only used if
- // users want to run the connector forever. This approach works because
Google Dataflow
- // checkpoints every 5s or 5MB output provided and the change stream query
has deadline for 1 min.
+ // users want to run the connector forever. If the end timestamp is reached,
we will resume
+ // processing from that timestamp on a subsequent DoFn execution.
private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60,
current.getNanos());
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 21f5a888b14..cf4c047025c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -27,6 +29,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Struct;
import java.util.Arrays;
import java.util.Optional;
@@ -55,10 +59,12 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
public class QueryChangeStreamActionTest {
private static final String PARTITION_TOKEN = "partitionToken";
private static final Timestamp PARTITION_START_TIMESTAMP =
Timestamp.ofTimeMicroseconds(10L);
+ private static final Timestamp RECORD_TIMESTAMP =
Timestamp.ofTimeMicroseconds(20L);
private static final Timestamp PARTITION_END_TIMESTAMP =
Timestamp.ofTimeMicroseconds(30L);
private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L;
private static final Instant WATERMARK = Instant.now();
@@ -136,6 +142,22 @@ public class QueryChangeStreamActionTest {
when(partitionMetadataMapper.from(row)).thenReturn(partition);
}
+ void setupUnboundedPartition() {
+ partition =
+ PartitionMetadata.newBuilder()
+ .setPartitionToken(PARTITION_TOKEN)
+ .setParentTokens(Sets.newHashSet("parentToken"))
+ .setStartTimestamp(PARTITION_START_TIMESTAMP)
+ .setEndTimestamp(MAX_INCLUSIVE_END_AT)
+ .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
+ .setState(SCHEDULED)
+ .setWatermark(WATERMARK_TIMESTAMP)
+ .setScheduledAt(Timestamp.now())
+ .build();
+ when(partitionMetadataMapper.from(any())).thenReturn(partition);
+ when(restriction.getTo()).thenReturn(MAX_INCLUSIVE_END_AT);
+ }
+
@Test
public void testQueryChangeStreamWithDataChangeRecord() {
final Struct rowAsStruct = mock(Struct.class);
@@ -145,7 +167,7 @@ public class QueryChangeStreamActionTest {
final DataChangeRecord record1 = mock(DataChangeRecord.class);
final DataChangeRecord record2 = mock(DataChangeRecord.class);
when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
- when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+ when(record2.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
when(changeStreamDao.changeStreamQuery(
PARTITION_TOKEN,
PARTITION_START_TIMESTAMP,
@@ -214,8 +236,8 @@ public class QueryChangeStreamActionTest {
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
final HeartbeatRecord record1 = mock(HeartbeatRecord.class);
final HeartbeatRecord record2 = mock(HeartbeatRecord.class);
- when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
- when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+ when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+ when(record2.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
when(changeStreamDao.changeStreamQuery(
PARTITION_TOKEN,
PARTITION_START_TIMESTAMP,
@@ -498,19 +520,19 @@ public class QueryChangeStreamActionTest {
}
@Test
- public void testQueryChangeStreamWithPartitionEndRecord() {
+ public void testQueryChangeStreamWithPartitionEndRecordBoundedRestriction() {
final ChangeStreamResultSetMetadata resultSetMetadata =
mock(ChangeStreamResultSetMetadata.class);
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
final PartitionEndRecord record1 = mock(PartitionEndRecord.class);
- when(record1.getRecordTimestamp()).thenReturn(PARTITION_END_TIMESTAMP);
+ when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
when(changeStreamDao.changeStreamQuery(
PARTITION_TOKEN,
PARTITION_START_TIMESTAMP,
PARTITION_END_TIMESTAMP,
PARTITION_HEARTBEAT_MILLIS))
.thenReturn(resultSet);
- when(resultSet.next()).thenReturn(true);
+ when(resultSet.next()).thenReturn(true, false);
when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet,
resultSetMetadata))
.thenReturn(Arrays.asList(record1));
@@ -520,8 +542,9 @@ public class QueryChangeStreamActionTest {
eq(restrictionTracker),
any(RestrictionInterrupter.class),
eq(watermarkEstimator)))
- .thenReturn(Optional.of(ProcessContinuation.stop()));
+ .thenReturn(Optional.empty());
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
final ProcessContinuation result =
action.run(
@@ -535,14 +558,67 @@ public class QueryChangeStreamActionTest {
eq(restrictionTracker),
any(RestrictionInterrupter.class),
eq(watermarkEstimator));
- verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
+ verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
- verify(restrictionTracker, never()).tryClaim(any());
+ verify(partitionMetadataDao, never()).updateWatermark(any(), any());
+ }
+
+ @Test
+ public void
testQueryChangeStreamWithPartitionEndRecordUnboundedRestriction() {
+ setupUnboundedPartition();
+
+ final ChangeStreamResultSetMetadata resultSetMetadata =
+ mock(ChangeStreamResultSetMetadata.class);
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final PartitionEndRecord record1 = mock(PartitionEndRecord.class);
+ when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+ final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN),
+ eq(PARTITION_START_TIMESTAMP),
+ timestampCaptor.capture(),
+ eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true, false);
+ when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+ when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet,
resultSetMetadata))
+ .thenReturn(Arrays.asList(record1));
+ when(partitionEndRecordAction.run(
+ eq(partition),
+ eq(record1),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator)))
+ .thenReturn(Optional.empty());
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
+ verify(partitionEndRecordAction)
+ .run(
+ eq(partition),
+ eq(record1),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator));
+ verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionMetadataDao, never()).updateWatermark(any(), any());
}
@Test
@@ -611,8 +687,90 @@ public class QueryChangeStreamActionTest {
partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
assertEquals(ProcessContinuation.stop(), result);
+ verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
+ verify(metrics).decActivePartitionReadCounter();
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
+ }
+
+ @Test
+ public void testQueryChangeStreamFinishedWithResume() {
+ partition =
+ PartitionMetadata.newBuilder()
+ .setPartitionToken(PARTITION_TOKEN)
+ .setParentTokens(Sets.newHashSet("parentToken"))
+ .setStartTimestamp(PARTITION_START_TIMESTAMP)
+ .setEndTimestamp(MAX_INCLUSIVE_END_AT)
+ .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
+ .setState(SCHEDULED)
+ .setWatermark(WATERMARK_TIMESTAMP)
+ .setScheduledAt(Timestamp.now())
+ .build();
+ when(partitionMetadataMapper.from(any())).thenReturn(partition);
+
+ final ChangeStreamResultSet changeStreamResultSet =
mock(ChangeStreamResultSet.class);
+ final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN),
+ eq(PARTITION_START_TIMESTAMP),
+ timestampCaptor.capture(),
+ eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenReturn(changeStreamResultSet);
+ when(changeStreamResultSet.next()).thenReturn(false);
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+ assertEquals(ProcessContinuation.resume(), result);
+ assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
+
+ verify(restrictionTracker).tryClaim(timestampCaptor.getValue());
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
+ verify(partitionMetadataDao, never()).updateToFinished(PARTITION_TOKEN);
+ verify(metrics, never()).decActivePartitionReadCounter();
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
+ }
+
+ // Out of range indicates that we're beyond the end of the partition and
should stop
+ // processing.
+ @Test
+ public void testQueryChangeStreamWithOutOfRangeErrorOnUnboundedPartition() {
+ setupUnboundedPartition();
+
+ final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN),
+ eq(PARTITION_START_TIMESTAMP),
+ timestampCaptor.capture(),
+ eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenThrow(
+ SpannerExceptionFactory.newSpannerException(
+ ErrorCode.OUT_OF_RANGE, "Specified start_timestamp is
invalid"));
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+ assertEquals(ProcessContinuation.stop(), result);
+ assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
+
+ verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
+ verify(metrics).decActivePartitionReadCounter();
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
@@ -620,6 +778,142 @@ public class QueryChangeStreamActionTest {
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionMetadataDao, never()).updateWatermark(any(), any());
+ }
+
+ // Out of range indicates that we're beyond the end of the partition and
should stop
+ // processing.
+ @Test
+ public void testQueryChangeStreamWithOutOfRangeErrorOnBoundedPartition() {
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN),
+ eq(PARTITION_START_TIMESTAMP),
+ eq(PARTITION_END_TIMESTAMP),
+ eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenThrow(
+ SpannerExceptionFactory.newSpannerException(
+ ErrorCode.OUT_OF_RANGE, "Specified start_timestamp is
invalid"));
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+ assertEquals(ProcessContinuation.stop(), result);
+
+ verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
+ verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
+ verify(metrics).decActivePartitionReadCounter();
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionMetadataDao, never()).updateWatermark(any(), any());
+ }
+
+ @Test
+ public void
testQueryChangeStreamWithChildPartitionsRecordBoundedRestriction() {
+ final ChangeStreamResultSetMetadata resultSetMetadata =
+ mock(ChangeStreamResultSetMetadata.class);
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
+ when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+ when(changeStreamDao.changeStreamQuery(
+ PARTITION_TOKEN,
+ PARTITION_START_TIMESTAMP,
+ PARTITION_END_TIMESTAMP,
+ PARTITION_HEARTBEAT_MILLIS))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true, false);
+ when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+ when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet,
resultSetMetadata))
+ .thenReturn(Arrays.asList(record1));
+ when(childPartitionsRecordAction.run(
+ eq(partition),
+ eq(record1),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator)))
+ .thenReturn(Optional.empty());
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ verify(childPartitionsRecordAction)
+ .run(
+ eq(partition),
+ eq(record1),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator));
+ verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionMetadataDao, never()).updateWatermark(any(), any());
+ }
+
+ @Test
+ public void
testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction() {
+ setupUnboundedPartition();
+
+ final ChangeStreamResultSetMetadata resultSetMetadata =
+ mock(ChangeStreamResultSetMetadata.class);
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
+ when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+ final ArgumentCaptor<Timestamp> timestampCaptor =
ArgumentCaptor.forClass(Timestamp.class);
+ when(changeStreamDao.changeStreamQuery(
+ eq(PARTITION_TOKEN),
+ eq(PARTITION_START_TIMESTAMP),
+ timestampCaptor.capture(),
+ eq(PARTITION_HEARTBEAT_MILLIS)))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true, false);
+ when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+ when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet,
resultSetMetadata))
+ .thenReturn(Arrays.asList(record1));
+ when(childPartitionsRecordAction.run(
+ eq(partition),
+ eq(record1),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator)))
+ .thenReturn(Optional.empty());
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
+ verify(childPartitionsRecordAction)
+ .run(
+ eq(partition),
+ eq(record1),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator));
+ verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionMetadataDao, never()).updateWatermark(any(), any());
}
private static class BundleFinalizerStub implements BundleFinalizer {