This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 47bab7e7597 [SpannerIO] Add low-latency configuration in Spanner
Change Streams (#37718)
47bab7e7597 is described below
commit 47bab7e759700c915e85cecc0eef864bcb62f293
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Sat Mar 14 09:53:10 2026 +0100
[SpannerIO] Add low-latency configuration in Spanner Change Streams (#37718)
When enabled, low latency mode will stop SDF polling after 1 second or the
first heartbeat response received with 100ms heartbeat latency configured.
This reduces e2e processing latency by completing bundles faster which is
necessary for messages to progress to the next fused stage in runners such as
Dataflow.
---
.../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 70 +++++++++-
.../changestreams/ChangeStreamsConstants.java | 9 ++
.../changestreams/action/ActionFactory.java | 12 +-
.../action/HeartbeatRecordAction.java | 14 +-
.../action/QueryChangeStreamAction.java | 19 ++-
.../spanner/changestreams/dofn/InitializeDoFn.java | 12 +-
.../dofn/ReadChangeStreamPartitionDoFn.java | 18 ++-
.../action/HeartbeatRecordActionTest.java | 149 ++++++++++++++++++++-
.../action/QueryChangeStreamActionTest.java | 114 +++++++++++++---
.../changestreams/dofn/InitializeDoFnTest.java | 3 +-
.../dofn/ReadChangeStreamPartitionDoFnTest.java | 19 ++-
11 files changed, 382 insertions(+), 57 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 3494ca22be4..e19137abb40 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -20,10 +20,14 @@ package org.apache.beam.sdk.io.gcp.spanner;
import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
@@ -537,6 +541,9 @@ public class SpannerIO {
.setRpcPriority(DEFAULT_RPC_PRIORITY)
.setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT)
.setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT)
+ .setRealTimeCheckpointInterval(DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL)
+ .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
+ .setCancelQueryOnHeartbeat(false)
.build();
}
@@ -1761,6 +1768,12 @@ public class SpannerIO {
abstract @Nullable ValueProvider<Boolean> getPlainText();
+ abstract Duration getRealTimeCheckpointInterval();
+
+ abstract int getHeartbeatMillis();
+
+ abstract boolean getCancelQueryOnHeartbeat();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -1790,6 +1803,18 @@ public class SpannerIO {
abstract Builder setPlainText(ValueProvider<Boolean> plainText);
+ /**
+ * When caught up to real-time, checkpoint processing of change stream
this often. This sets a
+ * bound on latency of processing if a steady trickle of elements
prevents the heartbeat
+ * interval from triggering.
+ */
+ abstract Builder setRealTimeCheckpointInterval(Duration
realTimeCheckpointInterval);
+
+ /** Heartbeat interval for all change stream queries. */
+ abstract Builder setHeartbeatMillis(int heartbeatMillis);
+
+ abstract Builder setCancelQueryOnHeartbeat(boolean
cancelQueryOnHeartbeat);
+
abstract ReadChangeStream build();
}
@@ -1912,6 +1937,37 @@ public class SpannerIO {
return
withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}
+ /**
+ * Configures low latency experiment for readChangeStream transform.
Example usage:
+ *
+ * <pre>{@code
+ * PCollection<Struct> rows = p.apply(
+ * SpannerIO.readChangeStream()
+ * .withSpannerConfig(
+ * SpannerConfig.create()
+ * .withProjectId(projectId)
+ * .withInstanceId(instanceId)
+ * .withDatabaseId(dbId))
+ * .withChangeStreamName(changeStreamName)
+ * .withMetadataInstance(metadataInstanceId)
+ * .withMetadataDatabase(metadataDatabase)
+ * .withInclusiveStartAt(Timestamp.now()))
+ * .withLowLatency();
+ * }</pre>
+ */
+ public ReadChangeStream withLowLatency() {
+ // Set both the realtime end timestamp and the heartbeat interval.
+ // Heartbeats might not trigger if data arrives continuously (e.g. every
50ms),
+ // which could delay the bundle completion up to the runner's default
split time (often 5s).
+ // Since end-to-end processing requires the bundle to finish and commit,
+ // adding a realtime end timeout of 1s bounds this delay and improves
latency.
+ return toBuilder()
+ .setHeartbeatMillis(LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS)
+ .setCancelQueryOnHeartbeat(true)
+
.setRealTimeCheckpointInterval(LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL)
+ .build();
+ }
+
@Override
public PCollection<DataChangeRecord> expand(PBegin input) {
checkArgument(
@@ -2018,13 +2074,23 @@ public class SpannerIO {
MoreObjects.firstNonNull(getWatermarkRefreshRate(),
DEFAULT_WATERMARK_REFRESH_RATE);
final CacheFactory cacheFactory = new CacheFactory(daoFactory,
watermarkRefreshRate);
+ final long heartbeatMillis = getHeartbeatMillis();
+
final InitializeDoFn initializeDoFn =
- new InitializeDoFn(daoFactory, mapperFactory, startTimestamp,
endTimestamp);
+ new InitializeDoFn(
+ daoFactory, mapperFactory, startTimestamp, endTimestamp,
heartbeatMillis);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(
daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);
+
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
- new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory,
actionFactory, metrics);
+ new ReadChangeStreamPartitionDoFn(
+ daoFactory,
+ mapperFactory,
+ actionFactory,
+ metrics,
+ getRealTimeCheckpointInterval(),
+ getCancelQueryOnHeartbeat());
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
index db09adb0f27..9b7c76a3ff5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
@@ -49,6 +49,15 @@ public class ChangeStreamsConstants {
*/
public static final Timestamp DEFAULT_INCLUSIVE_END_AT =
MAX_INCLUSIVE_END_AT;
+ public static final Duration DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL =
Duration.standardMinutes(2);
+
+ public static final int DEFAULT_HEARTBEAT_MILLIS = 2000;
+
+ public static final int LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS = 100;
+
+ public static final Duration LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL =
+ Duration.standardSeconds(1);
+
/** The default priority for a change stream query is {@link
RpcPriority#HIGH}. */
public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index cd84168b23f..6850d77cbf5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -71,9 +71,10 @@ public class ActionFactory implements Serializable {
* @param metrics metrics gathering class
* @return singleton instance of the {@link HeartbeatRecordAction}
*/
- public synchronized HeartbeatRecordAction
heartbeatRecordAction(ChangeStreamMetrics metrics) {
+ public synchronized HeartbeatRecordAction heartbeatRecordAction(
+ ChangeStreamMetrics metrics, boolean cancelQueryOnHeartbeat) {
if (heartbeatRecordActionInstance == null) {
- heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics);
+ heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics,
cancelQueryOnHeartbeat);
}
return heartbeatRecordActionInstance;
}
@@ -174,6 +175,7 @@ public class ActionFactory implements Serializable {
* @param partitionEventRecordAction action class to process {@link
*
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord}s
* @param metrics metrics gathering class
+ * @param realTimeCheckpointInterval the duration added to current time for
the end timestamp
* @return single instance of the {@link QueryChangeStreamAction}
*/
public synchronized QueryChangeStreamAction queryChangeStreamAction(
@@ -188,7 +190,8 @@ public class ActionFactory implements Serializable {
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
ChangeStreamMetrics metrics,
- boolean isMutableChangeStream) {
+ boolean isMutableChangeStream,
+ Duration realTimeCheckpointInterval) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
@@ -203,7 +206,8 @@ public class ActionFactory implements Serializable {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
- isMutableChangeStream);
+ isMutableChangeStream,
+ realTimeCheckpointInterval);
}
return queryChangeStreamActionInstance;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
index 0937e896fbf..1b66a548b3d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
@@ -41,14 +41,16 @@ import org.slf4j.LoggerFactory;
public class HeartbeatRecordAction {
private static final Logger LOG =
LoggerFactory.getLogger(HeartbeatRecordAction.class);
private final ChangeStreamMetrics metrics;
+ private final boolean cancelQueryOnHeartbeat;
/**
* Constructs an action class for handling {@link HeartbeatRecord}s.
*
* @param metrics metrics gathering class
*/
- HeartbeatRecordAction(ChangeStreamMetrics metrics) {
+ HeartbeatRecordAction(ChangeStreamMetrics metrics, boolean
cancelQueryOnHeartbeat) {
this.metrics = metrics;
+ this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat;
}
/**
@@ -76,7 +78,8 @@ public class HeartbeatRecordAction {
HeartbeatRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
RestrictionInterrupter<Timestamp> interrupter,
- ManualWatermarkEstimator<Instant> watermarkEstimator) {
+ ManualWatermarkEstimator<Instant> watermarkEstimator,
+ Timestamp endTimestamp) {
final String token = partition.getPartitionToken();
LOG.debug("[{}] Processing heartbeat record {}", token, record);
@@ -96,6 +99,11 @@ public class HeartbeatRecordAction {
watermarkEstimator.setWatermark(timestampInstant);
LOG.debug("[{}] Heartbeat record action completed successfully", token);
- return Optional.empty();
+ if (timestamp.equals(endTimestamp)) {
+ // this is probably last element in query, let it finish query
+ return Optional.empty();
+ }
+ // no new data, finish reading data
+ return cancelQueryOnHeartbeat ? Optional.empty() :
Optional.of(ProcessContinuation.resume());
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index e81d8aef473..7feec990f52 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -91,6 +91,7 @@ public class QueryChangeStreamAction {
private final PartitionEventRecordAction partitionEventRecordAction;
private final ChangeStreamMetrics metrics;
private final boolean isMutableChangeStream;
+ private final Duration realTimeCheckpointInterval;
/**
* Constructs an action class for performing a change stream query for a
given partition.
@@ -109,6 +110,7 @@ public class QueryChangeStreamAction {
* @param partitionEventRecordAction action class to process {@link
PartitionEventRecord}s
* @param metrics metrics gathering class
* @param isMutableChangeStream whether the change stream is mutable or not
+ * @param realTimeCheckpointInterval duration to add to current time
*/
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
@@ -122,7 +124,8 @@ public class QueryChangeStreamAction {
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
ChangeStreamMetrics metrics,
- boolean isMutableChangeStream) {
+ boolean isMutableChangeStream,
+ Duration realTimeCheckpointInterval) {
this.changeStreamDao = changeStreamDao;
this.partitionMetadataDao = partitionMetadataDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -135,6 +138,7 @@ public class QueryChangeStreamAction {
this.partitionEventRecordAction = partitionEventRecordAction;
this.metrics = metrics;
this.isMutableChangeStream = isMutableChangeStream;
+ this.realTimeCheckpointInterval = realTimeCheckpointInterval;
}
/**
@@ -244,7 +248,8 @@ public class QueryChangeStreamAction {
(HeartbeatRecord) record,
tracker,
interrupter,
- watermarkEstimator);
+ watermarkEstimator,
+ endTimestamp);
} else if (record instanceof ChildPartitionsRecord) {
maybeContinuation =
childPartitionsRecordAction.run(
@@ -387,12 +392,12 @@ public class QueryChangeStreamAction {
&& e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
}
- // Return (now + 2 mins) as the end timestamp for reading change streams.
This is only used if
- // users want to run the connector forever. If the end timestamp is reached,
we will resume
- // processing from that timestamp on a subsequent DoFn execution.
+ // Return (now + config duration) as the end timestamp for reading change
streams. This is only
+ // used if users want to run the connector forever. If the end timestamp is
reached, we
+ // will resume processing from that timestamp on a subsequent DoFn execution.
private Timestamp getNextReadChangeStreamEndTimestamp() {
- final Timestamp current = Timestamp.now();
- return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60,
current.getNanos());
+ return Timestamp.ofTimeMicroseconds(
+ Instant.now().plus(realTimeCheckpointInterval).getMillis() * 1000L);
}
// For Mutable Change Stream bounded queries, update the query end timestamp
to be within 2
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
index 60eb96ca338..4191f2d9359 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
@@ -36,11 +36,7 @@ public class InitializeDoFn extends DoFn<byte[],
PartitionMetadata> implements S
private static final long serialVersionUID = -8921188388649003102L;
- /** Heartbeat interval for all change stream queries will be of 2 seconds. */
- // Be careful when changing this interval, as it needs to be less than the
checkpointing interval
- // in Dataflow. Otherwise, if there are no records within checkpoint
intervals, the consuming of
- // a change stream query might get stuck.
- private static final long DEFAULT_HEARTBEAT_MILLIS = 2000;
+ private final long heartbeatMillis;
private final DaoFactory daoFactory;
private final MapperFactory mapperFactory;
@@ -53,11 +49,13 @@ public class InitializeDoFn extends DoFn<byte[],
PartitionMetadata> implements S
DaoFactory daoFactory,
MapperFactory mapperFactory,
com.google.cloud.Timestamp startTimestamp,
- com.google.cloud.Timestamp endTimestamp) {
+ com.google.cloud.Timestamp endTimestamp,
+ long heartbeatMillis) {
this.daoFactory = daoFactory;
this.mapperFactory = mapperFactory;
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
+ this.heartbeatMillis = heartbeatMillis;
}
@ProcessElement
@@ -88,7 +86,7 @@ public class InitializeDoFn extends DoFn<byte[],
PartitionMetadata> implements S
.setPartitionToken(InitialPartition.PARTITION_TOKEN)
.setStartTimestamp(startTimestamp)
.setEndTimestamp(endTimestamp)
- .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
+ .setHeartbeatMillis(heartbeatMillis)
.setState(State.CREATED)
.setWatermark(startTimestamp)
.build();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index c3650b42761..750865efbf0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -74,12 +74,15 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
private final ActionFactory actionFactory;
private final ChangeStreamMetrics metrics;
private final boolean isMutableChangeStream;
+ private final boolean cancelQueryOnHeartbeat;
/**
* Needs to be set through the {@link
*
ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)}
call.
*/
private ThroughputEstimator<DataChangeRecord> throughputEstimator;
+ private final Duration realTimeCheckpointInterval;
+
private transient QueryChangeStreamAction queryChangeStreamAction;
/**
@@ -95,17 +98,23 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
* @param mapperFactory the {@link MapperFactory} to construct {@link
ChangeStreamRecordMapper}s
* @param actionFactory the {@link ActionFactory} to construct actions
* @param metrics the {@link ChangeStreamMetrics} to emit partition related
metrics
+ * @param realTimeCheckpointInterval duration to be used for the next end
timestamp
+ * @param cancelQueryOnHeartbeat flag to improve low latency checkpointing
*/
public ReadChangeStreamPartitionDoFn(
DaoFactory daoFactory,
MapperFactory mapperFactory,
ActionFactory actionFactory,
- ChangeStreamMetrics metrics) {
+ ChangeStreamMetrics metrics,
+ Duration realTimeCheckpointInterval,
+ boolean cancelQueryOnHeartbeat) {
this.daoFactory = daoFactory;
- this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
+ this.mapperFactory = mapperFactory;
this.metrics = metrics;
this.isMutableChangeStream = daoFactory.isMutableChangeStream();
+ this.realTimeCheckpointInterval = realTimeCheckpointInterval;
+ this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat;
this.throughputEstimator = new NullThroughputEstimator<>();
}
@@ -195,7 +204,7 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
final DataChangeRecordAction dataChangeRecordAction =
actionFactory.dataChangeRecordAction(throughputEstimator);
final HeartbeatRecordAction heartbeatRecordAction =
- actionFactory.heartbeatRecordAction(metrics);
+ actionFactory.heartbeatRecordAction(metrics, cancelQueryOnHeartbeat);
final ChildPartitionsRecordAction childPartitionsRecordAction =
actionFactory.childPartitionsRecordAction(partitionMetadataDao,
metrics);
final PartitionStartRecordAction partitionStartRecordAction =
@@ -218,7 +227,8 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
- isMutableChangeStream);
+ isMutableChangeStream,
+ realTimeCheckpointInterval);
}
/**
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
index 56d1825c8a1..adfc4ea35d4 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
public class HeartbeatRecordActionTest {
private HeartbeatRecordAction action;
+ private HeartbeatRecordAction cancellingAction;
private PartitionMetadata partition;
private RestrictionTracker<TimestampRange, Timestamp> tracker;
private RestrictionInterrupter<Timestamp> interrupter;
@@ -49,7 +50,8 @@ public class HeartbeatRecordActionTest {
@Before
public void setUp() {
final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
- action = new HeartbeatRecordAction(metrics);
+ action = new HeartbeatRecordAction(metrics, false);
+ cancellingAction = new HeartbeatRecordAction(metrics, true);
partition = mock(PartitionMetadata.class);
tracker = mock(RestrictionTracker.class);
interrupter = mock(RestrictionInterrupter.class);
@@ -60,6 +62,7 @@ public class HeartbeatRecordActionTest {
public void testRestrictionClaimed() {
final String partitionToken = "partitionToken";
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
when(tracker.tryClaim(timestamp)).thenReturn(true);
when(partition.getPartitionToken()).thenReturn(partitionToken);
@@ -70,7 +73,30 @@ public class HeartbeatRecordActionTest {
new HeartbeatRecord(timestamp, null),
tracker,
interrupter,
- watermarkEstimator);
+ watermarkEstimator,
+ endTimestamp);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new
Instant(timestamp.toSqlTimestamp().getTime()));
+ }
+
+ @Test
+ public void testRestrictionClaimedOnCancellingAction() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+ when(tracker.tryClaim(timestamp)).thenReturn(true);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional<ProcessContinuation> maybeContinuation =
+ cancellingAction.run(
+ partition,
+ new HeartbeatRecord(timestamp, null),
+ tracker,
+ interrupter,
+ watermarkEstimator,
+ endTimestamp);
assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new
Instant(timestamp.toSqlTimestamp().getTime()));
@@ -80,6 +106,7 @@ public class HeartbeatRecordActionTest {
public void testRestrictionNotClaimed() {
final String partitionToken = "partitionToken";
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
when(tracker.tryClaim(timestamp)).thenReturn(false);
when(partition.getPartitionToken()).thenReturn(partitionToken);
@@ -90,7 +117,30 @@ public class HeartbeatRecordActionTest {
new HeartbeatRecord(timestamp, null),
tracker,
interrupter,
- watermarkEstimator);
+ watermarkEstimator,
+ endTimestamp);
+
+ assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
+ verify(watermarkEstimator, never()).setWatermark(any());
+ }
+
+ @Test
+ public void testRestrictionNotClaimedOnCancellingAction() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+ when(tracker.tryClaim(timestamp)).thenReturn(false);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional<ProcessContinuation> maybeContinuation =
+ cancellingAction.run(
+ partition,
+ new HeartbeatRecord(timestamp, null),
+ tracker,
+ interrupter,
+ watermarkEstimator,
+ endTimestamp);
assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
verify(watermarkEstimator, never()).setWatermark(any());
@@ -100,6 +150,7 @@ public class HeartbeatRecordActionTest {
public void testSoftDeadlineReached() {
final String partitionToken = "partitionToken";
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
when(interrupter.tryInterrupt(timestamp)).thenReturn(true);
when(tracker.tryClaim(timestamp)).thenReturn(true);
@@ -111,9 +162,99 @@ public class HeartbeatRecordActionTest {
new HeartbeatRecord(timestamp, null),
tracker,
interrupter,
- watermarkEstimator);
+ watermarkEstimator,
+ endTimestamp);
+
+ assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
+ verify(watermarkEstimator, never()).setWatermark(any());
+ }
+
+ @Test
+ public void testSoftDeadlineReachedOnCancellingAction() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+ when(interrupter.tryInterrupt(timestamp)).thenReturn(true);
+ when(tracker.tryClaim(timestamp)).thenReturn(true);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional<ProcessContinuation> maybeContinuation =
+ cancellingAction.run(
+ partition,
+ new HeartbeatRecord(timestamp, null),
+ tracker,
+ interrupter,
+ watermarkEstimator,
+ endTimestamp);
assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(watermarkEstimator, never()).setWatermark(any());
}
+
+ @Test
+ public void testEndTimestampReachedOnCancellingAction() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+ when(tracker.tryClaim(timestamp)).thenReturn(true);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional<ProcessContinuation> maybeContinuation =
+ cancellingAction.run(
+ partition,
+ new HeartbeatRecord(timestamp, null),
+ tracker,
+ interrupter,
+ watermarkEstimator,
+ endTimestamp);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new
Instant(timestamp.toSqlTimestamp().getTime()));
+ }
+
+ @Test
+ public void testEndTimestampNotReachedOnCancellingAction() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+
+ when(tracker.tryClaim(timestamp)).thenReturn(true);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional<ProcessContinuation> maybeContinuation =
+ cancellingAction.run(
+ partition,
+ new HeartbeatRecord(timestamp, null),
+ tracker,
+ interrupter,
+ watermarkEstimator,
+ endTimestamp);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new
Instant(timestamp.toSqlTimestamp().getTime()));
+ }
+
+ @Test
+ public void testEndTimestampNotReachedOnAction() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+
+ when(tracker.tryClaim(timestamp)).thenReturn(true);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional<ProcessContinuation> maybeContinuation =
+ action.run(
+ partition,
+ new HeartbeatRecord(timestamp, null),
+ tracker,
+ interrupter,
+ watermarkEstimator,
+ endTimestamp);
+
+ assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new
Instant(timestamp.toSqlTimestamp().getTime()));
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 26ab41dff87..7c5d6d0f187 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -58,6 +58,7 @@ import
org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
@@ -119,7 +120,8 @@ public class QueryChangeStreamActionTest {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
- false);
+ false,
+ Duration.standardMinutes(2));
final Struct row = mock(Struct.class);
partition =
PartitionMetadata.newBuilder()
@@ -223,7 +225,7 @@ public class QueryChangeStreamActionTest {
eq(watermarkEstimator));
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
@@ -257,14 +259,16 @@ public class QueryChangeStreamActionTest {
eq(record1),
eq(restrictionTracker),
any(RestrictionInterrupter.class),
- eq(watermarkEstimator)))
+ eq(watermarkEstimator),
+ eq(PARTITION_END_TIMESTAMP)))
.thenReturn(Optional.empty());
when(heartbeatRecordAction.run(
eq(partition),
eq(record2),
eq(restrictionTracker),
any(RestrictionInterrupter.class),
- eq(watermarkEstimator)))
+ eq(watermarkEstimator),
+ eq(PARTITION_END_TIMESTAMP)))
.thenReturn(Optional.of(ProcessContinuation.stop()));
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
@@ -279,14 +283,80 @@ public class QueryChangeStreamActionTest {
eq(record1),
eq(restrictionTracker),
any(RestrictionInterrupter.class),
- eq(watermarkEstimator));
+ eq(watermarkEstimator),
+ eq(PARTITION_END_TIMESTAMP));
verify(heartbeatRecordAction)
.run(
eq(partition),
eq(record2),
eq(restrictionTracker),
any(RestrictionInterrupter.class),
- eq(watermarkEstimator));
+ eq(watermarkEstimator),
+ eq(PARTITION_END_TIMESTAMP));
+ verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
+ verify(restrictionTracker, never()).tryClaim(any());
+ }
+
+ @Test
+ public void testQueryChangeStreamWithHeartbeatRecordAndCancelOnHeartbeat() {
+ final Struct rowAsStruct = mock(Struct.class);
+ final ChangeStreamResultSetMetadata resultSetMetadata =
+ mock(ChangeStreamResultSetMetadata.class);
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final HeartbeatRecord record1 = mock(HeartbeatRecord.class);
+ final HeartbeatRecord record2 = mock(HeartbeatRecord.class);
+ when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+ when(record2.getRecordTimestamp()).thenReturn(PARTITION_END_TIMESTAMP);
+ when(changeStreamDao.changeStreamQuery(
+ PARTITION_TOKEN,
+ PARTITION_START_TIMESTAMP,
+ PARTITION_END_TIMESTAMP,
+ PARTITION_HEARTBEAT_MILLIS))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+ when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+ when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet,
resultSetMetadata))
+ .thenReturn(Arrays.asList(record1, record2));
+ when(heartbeatRecordAction.run(
+ eq(partition),
+ eq(record1),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator),
+ eq(PARTITION_END_TIMESTAMP)))
+ .thenReturn(Optional.of(ProcessContinuation.resume()));
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator,
bundleFinalizer);
+
+ assertEquals(ProcessContinuation.resume(), result);
+ verify(heartbeatRecordAction)
+ .run(
+ eq(partition),
+ eq(record1),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator),
+ eq(PARTITION_END_TIMESTAMP));
+
+ // Heartbeat cancels loop and second record is not processed
+ verify(heartbeatRecordAction, never())
+ .run(
+ eq(partition),
+ eq(record2),
+ eq(restrictionTracker),
+ any(RestrictionInterrupter.class),
+ eq(watermarkEstimator),
+ eq(PARTITION_END_TIMESTAMP));
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
@@ -356,7 +426,7 @@ public class QueryChangeStreamActionTest {
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
@@ -419,7 +489,7 @@ public class QueryChangeStreamActionTest {
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
@@ -467,7 +537,7 @@ public class QueryChangeStreamActionTest {
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(restrictionTracker, never()).tryClaim(any());
}
@@ -517,7 +587,7 @@ public class QueryChangeStreamActionTest {
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(restrictionTracker, never()).tryClaim(any());
}
@@ -564,7 +634,7 @@ public class QueryChangeStreamActionTest {
verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
@@ -617,7 +687,7 @@ public class QueryChangeStreamActionTest {
verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
@@ -665,7 +735,7 @@ public class QueryChangeStreamActionTest {
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN,
WATERMARK_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
@@ -694,7 +764,7 @@ public class QueryChangeStreamActionTest {
verify(metrics).decActivePartitionReadCounter();
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
@@ -740,7 +810,7 @@ public class QueryChangeStreamActionTest {
verify(metrics, never()).decActivePartitionReadCounter();
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
@@ -776,7 +846,7 @@ public class QueryChangeStreamActionTest {
verify(metrics).decActivePartitionReadCounter();
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
@@ -809,7 +879,7 @@ public class QueryChangeStreamActionTest {
verify(metrics).decActivePartitionReadCounter();
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
@@ -859,7 +929,7 @@ public class QueryChangeStreamActionTest {
verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
@@ -912,7 +982,7 @@ public class QueryChangeStreamActionTest {
verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(),
any(), any());
@@ -935,7 +1005,8 @@ public class QueryChangeStreamActionTest {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
- true);
+ true,
+ Duration.standardMinutes(2));
// Set endTimestamp to 60 minutes in the future
Timestamp now = Timestamp.now();
@@ -983,7 +1054,8 @@ public class QueryChangeStreamActionTest {
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
- true);
+ true,
+ Duration.standardMinutes(2));
// Set endTimestamp to only 10 seconds in the future
Timestamp now = Timestamp.now();
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java
index 9672e23b16d..c3bee10f8e1 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java
@@ -62,7 +62,8 @@ public class InitializeDoFnTest {
daoFactory,
mapperFactory,
Timestamp.ofTimeMicroseconds(1L),
- Timestamp.ofTimeMicroseconds(2L));
+ Timestamp.ofTimeMicroseconds(2L),
+ 2000L);
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 9e588de77a0..9a783f5de31 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -53,6 +53,7 @@ import
org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
@@ -67,6 +68,7 @@ public class ReadChangeStreamPartitionDoFnTest {
Timestamp.ofTimeSecondsAndNanos(10, 20);
private static final Timestamp PARTITION_END_TIMESTAMP =
Timestamp.ofTimeSecondsAndNanos(30, 40);
private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L;
+ private static final boolean CANCEL_QUERY_ON_HEARTBEAT = true;
private ReadChangeStreamPartitionDoFn doFn;
private PartitionMetadata partition;
@@ -103,7 +105,14 @@ public class ReadChangeStreamPartitionDoFnTest {
partitionEventRecordAction = mock(PartitionEventRecordAction.class);
queryChangeStreamAction = mock(QueryChangeStreamAction.class);
- doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory,
actionFactory, metrics);
+ doFn =
+ new ReadChangeStreamPartitionDoFn(
+ daoFactory,
+ mapperFactory,
+ actionFactory,
+ metrics,
+ Duration.standardMinutes(2),
+ CANCEL_QUERY_ON_HEARTBEAT);
doFn.setThroughputEstimator(throughputEstimator);
partition =
@@ -131,7 +140,8 @@ public class ReadChangeStreamPartitionDoFnTest {
when(actionFactory.dataChangeRecordAction(throughputEstimator))
.thenReturn(dataChangeRecordAction);
-
when(actionFactory.heartbeatRecordAction(metrics)).thenReturn(heartbeatRecordAction);
+ when(actionFactory.heartbeatRecordAction(metrics,
CANCEL_QUERY_ON_HEARTBEAT))
+ .thenReturn(heartbeatRecordAction);
when(actionFactory.childPartitionsRecordAction(partitionMetadataDao,
metrics))
.thenReturn(childPartitionsRecordAction);
when(actionFactory.partitionStartRecordAction(partitionMetadataDao,
metrics))
@@ -152,7 +162,8 @@ public class ReadChangeStreamPartitionDoFnTest {
eq(partitionEndRecordAction),
eq(partitionEventRecordAction),
eq(metrics),
- anyBoolean()))
+ anyBoolean(),
+ eq(Duration.standardMinutes(2))))
.thenReturn(queryChangeStreamAction);
doFn.setup();
@@ -171,7 +182,7 @@ public class ReadChangeStreamPartitionDoFnTest {
.run(partition, tracker, receiver, watermarkEstimator,
bundleFinalizer);
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
- verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(),
any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(),
any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(),
any());