This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 47bab7e7597 [SpannerIO] Add low-latency configuration in Spanner 
Change Streams (#37718)
47bab7e7597 is described below

commit 47bab7e759700c915e85cecc0eef864bcb62f293
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Sat Mar 14 09:53:10 2026 +0100

    [SpannerIO] Add low-latency configuration in Spanner Change Streams (#37718)
    
    When enabled, low latency mode will stop SDF polling after 1 second or the 
first heartbeat response received with 100ms heartbeat latency configured.  
This reduces e2e processing latency by completing bundles faster which is 
necessary for messages to progress to the next fused stage in runners such as 
Dataflow.
---
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  |  70 +++++++++-
 .../changestreams/ChangeStreamsConstants.java      |   9 ++
 .../changestreams/action/ActionFactory.java        |  12 +-
 .../action/HeartbeatRecordAction.java              |  14 +-
 .../action/QueryChangeStreamAction.java            |  19 ++-
 .../spanner/changestreams/dofn/InitializeDoFn.java |  12 +-
 .../dofn/ReadChangeStreamPartitionDoFn.java        |  18 ++-
 .../action/HeartbeatRecordActionTest.java          | 149 ++++++++++++++++++++-
 .../action/QueryChangeStreamActionTest.java        | 114 +++++++++++++---
 .../changestreams/dofn/InitializeDoFnTest.java     |   3 +-
 .../dofn/ReadChangeStreamPartitionDoFnTest.java    |  19 ++-
 11 files changed, 382 insertions(+), 57 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 3494ca22be4..e19137abb40 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -20,10 +20,14 @@ package org.apache.beam.sdk.io.gcp.spanner;
 import static java.util.stream.Collectors.toList;
 import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
@@ -537,6 +541,9 @@ public class SpannerIO {
         .setRpcPriority(DEFAULT_RPC_PRIORITY)
         .setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT)
         .setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT)
+        .setRealTimeCheckpointInterval(DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL)
+        .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
+        .setCancelQueryOnHeartbeat(false)
         .build();
   }
 
@@ -1761,6 +1768,12 @@ public class SpannerIO {
 
     abstract @Nullable ValueProvider<Boolean> getPlainText();
 
+    abstract Duration getRealTimeCheckpointInterval();
+
+    abstract int getHeartbeatMillis();
+
+    abstract boolean getCancelQueryOnHeartbeat();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -1790,6 +1803,18 @@ public class SpannerIO {
 
       abstract Builder setPlainText(ValueProvider<Boolean> plainText);
 
+      /**
+       * When caught up to real-time, checkpoint processing of change stream 
this often. This sets a
+       * bound on latency of processing if a steady trickle of elements 
prevents the heartbeat
+       * interval from triggering.
+       */
+      abstract Builder setRealTimeCheckpointInterval(Duration 
realTimeCheckpointInterval);
+
+      /** Heartbeat interval for all change stream queries. */
+      abstract Builder setHeartbeatMillis(int heartbeatMillis);
+
+      abstract Builder setCancelQueryOnHeartbeat(boolean 
cancelQueryOnHeartbeat);
+
       abstract ReadChangeStream build();
     }
 
@@ -1912,6 +1937,37 @@ public class SpannerIO {
       return 
withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
     }
 
+    /**
+     * Configures low latency experiment for readChangeStream transform. 
Example usage:
+     *
+     * <pre>{@code
+     * PCollection<Struct> rows = p.apply(
+     *    SpannerIO.readChangeStream()
+     *    .withSpannerConfig(
+     *       SpannerConfig.create()
+     *         .withProjectId(projectId)
+     *         .withInstanceId(instanceId)
+     *         .withDatabaseId(dbId))
+     *    .withChangeStreamName(changeStreamName)
+     *    .withMetadataInstance(metadataInstanceId)
+     *    .withMetadataDatabase(metadataDatabase)
+     *    .withInclusiveStartAt(Timestamp.now()))
+     *    .withLowLatency();
+     * }</pre>
+     */
+    public ReadChangeStream withLowLatency() {
+      // Set both the realtime end timestamp and the heartbeat interval.
+      // Heartbeats might not trigger if data arrives continuously (e.g. every 
50ms),
+      // which could delay the bundle completion up to the runner's default 
split time (often 5s).
+      // Since end-to-end processing requires the bundle to finish and commit,
+      // adding a realtime end timeout of 1s bounds this delay and improves 
latency.
+      return toBuilder()
+          .setHeartbeatMillis(LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS)
+          .setCancelQueryOnHeartbeat(true)
+          
.setRealTimeCheckpointInterval(LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL)
+          .build();
+    }
+
     @Override
     public PCollection<DataChangeRecord> expand(PBegin input) {
       checkArgument(
@@ -2018,13 +2074,23 @@ public class SpannerIO {
           MoreObjects.firstNonNull(getWatermarkRefreshRate(), 
DEFAULT_WATERMARK_REFRESH_RATE);
       final CacheFactory cacheFactory = new CacheFactory(daoFactory, 
watermarkRefreshRate);
 
+      final long heartbeatMillis = getHeartbeatMillis();
+
       final InitializeDoFn initializeDoFn =
-          new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, 
endTimestamp);
+          new InitializeDoFn(
+              daoFactory, mapperFactory, startTimestamp, endTimestamp, 
heartbeatMillis);
       final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
           new DetectNewPartitionsDoFn(
               daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);
+
       final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
-          new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, 
actionFactory, metrics);
+          new ReadChangeStreamPartitionDoFn(
+              daoFactory,
+              mapperFactory,
+              actionFactory,
+              metrics,
+              getRealTimeCheckpointInterval(),
+              getCancelQueryOnHeartbeat());
       final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
           new PostProcessingMetricsDoFn(metrics);
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
index db09adb0f27..9b7c76a3ff5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
@@ -49,6 +49,15 @@ public class ChangeStreamsConstants {
    */
   public static final Timestamp DEFAULT_INCLUSIVE_END_AT = 
MAX_INCLUSIVE_END_AT;
 
+  public static final Duration DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL = 
Duration.standardMinutes(2);
+
+  public static final int DEFAULT_HEARTBEAT_MILLIS = 2000;
+
+  public static final int LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS = 100;
+
+  public static final Duration LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL =
+      Duration.standardSeconds(1);
+
   /** The default priority for a change stream query is {@link 
RpcPriority#HIGH}. */
   public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH;
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index cd84168b23f..6850d77cbf5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -71,9 +71,10 @@ public class ActionFactory implements Serializable {
    * @param metrics metrics gathering class
    * @return singleton instance of the {@link HeartbeatRecordAction}
    */
-  public synchronized HeartbeatRecordAction 
heartbeatRecordAction(ChangeStreamMetrics metrics) {
+  public synchronized HeartbeatRecordAction heartbeatRecordAction(
+      ChangeStreamMetrics metrics, boolean cancelQueryOnHeartbeat) {
     if (heartbeatRecordActionInstance == null) {
-      heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics);
+      heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics, 
cancelQueryOnHeartbeat);
     }
     return heartbeatRecordActionInstance;
   }
@@ -174,6 +175,7 @@ public class ActionFactory implements Serializable {
    * @param partitionEventRecordAction action class to process {@link
    *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord}s
    * @param metrics metrics gathering class
+   * @param realTimeCheckpointInterval the duration added to current time for 
the end timestamp
    * @return single instance of the {@link QueryChangeStreamAction}
    */
   public synchronized QueryChangeStreamAction queryChangeStreamAction(
@@ -188,7 +190,8 @@ public class ActionFactory implements Serializable {
       PartitionEndRecordAction partitionEndRecordAction,
       PartitionEventRecordAction partitionEventRecordAction,
       ChangeStreamMetrics metrics,
-      boolean isMutableChangeStream) {
+      boolean isMutableChangeStream,
+      Duration realTimeCheckpointInterval) {
     if (queryChangeStreamActionInstance == null) {
       queryChangeStreamActionInstance =
           new QueryChangeStreamAction(
@@ -203,7 +206,8 @@ public class ActionFactory implements Serializable {
               partitionEndRecordAction,
               partitionEventRecordAction,
               metrics,
-              isMutableChangeStream);
+              isMutableChangeStream,
+              realTimeCheckpointInterval);
     }
     return queryChangeStreamActionInstance;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
index 0937e896fbf..1b66a548b3d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
@@ -41,14 +41,16 @@ import org.slf4j.LoggerFactory;
 public class HeartbeatRecordAction {
   private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatRecordAction.class);
   private final ChangeStreamMetrics metrics;
+  private final boolean cancelQueryOnHeartbeat;
 
   /**
    * Constructs an action class for handling {@link HeartbeatRecord}s.
    *
    * @param metrics metrics gathering class
    */
-  HeartbeatRecordAction(ChangeStreamMetrics metrics) {
+  HeartbeatRecordAction(ChangeStreamMetrics metrics, boolean 
cancelQueryOnHeartbeat) {
     this.metrics = metrics;
+    this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat;
   }
 
   /**
@@ -76,7 +78,8 @@ public class HeartbeatRecordAction {
       HeartbeatRecord record,
       RestrictionTracker<TimestampRange, Timestamp> tracker,
       RestrictionInterrupter<Timestamp> interrupter,
-      ManualWatermarkEstimator<Instant> watermarkEstimator) {
+      ManualWatermarkEstimator<Instant> watermarkEstimator,
+      Timestamp endTimestamp) {
 
     final String token = partition.getPartitionToken();
     LOG.debug("[{}] Processing heartbeat record {}", token, record);
@@ -96,6 +99,11 @@ public class HeartbeatRecordAction {
     watermarkEstimator.setWatermark(timestampInstant);
 
     LOG.debug("[{}] Heartbeat record action completed successfully", token);
-    return Optional.empty();
+    if (timestamp.equals(endTimestamp)) {
+      // this is probably last element in query, let it finish query
+      return Optional.empty();
+    }
+    // no new data, finish reading data
+    return cancelQueryOnHeartbeat ? Optional.empty() : 
Optional.of(ProcessContinuation.resume());
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index e81d8aef473..7feec990f52 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -91,6 +91,7 @@ public class QueryChangeStreamAction {
   private final PartitionEventRecordAction partitionEventRecordAction;
   private final ChangeStreamMetrics metrics;
   private final boolean isMutableChangeStream;
+  private final Duration realTimeCheckpointInterval;
 
   /**
    * Constructs an action class for performing a change stream query for a 
given partition.
@@ -109,6 +110,7 @@ public class QueryChangeStreamAction {
    * @param partitionEventRecordAction action class to process {@link 
PartitionEventRecord}s
    * @param metrics metrics gathering class
    * @param isMutableChangeStream whether the change stream is mutable or not
+   * @param realTimeCheckpointInterval duration to add to current time
    */
   QueryChangeStreamAction(
       ChangeStreamDao changeStreamDao,
@@ -122,7 +124,8 @@ public class QueryChangeStreamAction {
       PartitionEndRecordAction partitionEndRecordAction,
       PartitionEventRecordAction partitionEventRecordAction,
       ChangeStreamMetrics metrics,
-      boolean isMutableChangeStream) {
+      boolean isMutableChangeStream,
+      Duration realTimeCheckpointInterval) {
     this.changeStreamDao = changeStreamDao;
     this.partitionMetadataDao = partitionMetadataDao;
     this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -135,6 +138,7 @@ public class QueryChangeStreamAction {
     this.partitionEventRecordAction = partitionEventRecordAction;
     this.metrics = metrics;
     this.isMutableChangeStream = isMutableChangeStream;
+    this.realTimeCheckpointInterval = realTimeCheckpointInterval;
   }
 
   /**
@@ -244,7 +248,8 @@ public class QueryChangeStreamAction {
                     (HeartbeatRecord) record,
                     tracker,
                     interrupter,
-                    watermarkEstimator);
+                    watermarkEstimator,
+                    endTimestamp);
           } else if (record instanceof ChildPartitionsRecord) {
             maybeContinuation =
                 childPartitionsRecordAction.run(
@@ -387,12 +392,12 @@ public class QueryChangeStreamAction {
         && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
   }
 
-  // Return (now + 2 mins) as the end timestamp for reading change streams. 
This is only used if
-  // users want to run the connector forever. If the end timestamp is reached, 
we will resume
-  // processing from that timestamp on a subsequent DoFn execution.
+  // Return (now + config duration) as the end timestamp for reading change 
streams. This is only
+  // used if  users want to run the connector forever. If the end timestamp is 
reached, we
+  // will resume processing from that timestamp on a subsequent DoFn execution.
   private Timestamp getNextReadChangeStreamEndTimestamp() {
-    final Timestamp current = Timestamp.now();
-    return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, 
current.getNanos());
+    return Timestamp.ofTimeMicroseconds(
+        Instant.now().plus(realTimeCheckpointInterval).getMillis() * 1000L);
   }
 
   // For Mutable Change Stream bounded queries, update the query end timestamp 
to be within 2
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
index 60eb96ca338..4191f2d9359 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java
@@ -36,11 +36,7 @@ public class InitializeDoFn extends DoFn<byte[], 
PartitionMetadata> implements S
 
   private static final long serialVersionUID = -8921188388649003102L;
 
-  /** Heartbeat interval for all change stream queries will be of 2 seconds. */
-  // Be careful when changing this interval, as it needs to be less than the 
checkpointing interval
-  // in Dataflow. Otherwise, if there are no records within checkpoint 
intervals, the consuming of
-  // a change stream query might get stuck.
-  private static final long DEFAULT_HEARTBEAT_MILLIS = 2000;
+  private final long heartbeatMillis;
 
   private final DaoFactory daoFactory;
   private final MapperFactory mapperFactory;
@@ -53,11 +49,13 @@ public class InitializeDoFn extends DoFn<byte[], 
PartitionMetadata> implements S
       DaoFactory daoFactory,
       MapperFactory mapperFactory,
       com.google.cloud.Timestamp startTimestamp,
-      com.google.cloud.Timestamp endTimestamp) {
+      com.google.cloud.Timestamp endTimestamp,
+      long heartbeatMillis) {
     this.daoFactory = daoFactory;
     this.mapperFactory = mapperFactory;
     this.startTimestamp = startTimestamp;
     this.endTimestamp = endTimestamp;
+    this.heartbeatMillis = heartbeatMillis;
   }
 
   @ProcessElement
@@ -88,7 +86,7 @@ public class InitializeDoFn extends DoFn<byte[], 
PartitionMetadata> implements S
             .setPartitionToken(InitialPartition.PARTITION_TOKEN)
             .setStartTimestamp(startTimestamp)
             .setEndTimestamp(endTimestamp)
-            .setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
+            .setHeartbeatMillis(heartbeatMillis)
             .setState(State.CREATED)
             .setWatermark(startTimestamp)
             .build();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index c3650b42761..750865efbf0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -74,12 +74,15 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
   private final ActionFactory actionFactory;
   private final ChangeStreamMetrics metrics;
   private final boolean isMutableChangeStream;
+  private final boolean cancelQueryOnHeartbeat;
   /**
    * Needs to be set through the {@link
    * 
ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)} 
call.
    */
   private ThroughputEstimator<DataChangeRecord> throughputEstimator;
 
+  private final Duration realTimeCheckpointInterval;
+
   private transient QueryChangeStreamAction queryChangeStreamAction;
 
   /**
@@ -95,17 +98,23 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
    * @param mapperFactory the {@link MapperFactory} to construct {@link 
ChangeStreamRecordMapper}s
    * @param actionFactory the {@link ActionFactory} to construct actions
    * @param metrics the {@link ChangeStreamMetrics} to emit partition related 
metrics
+   * @param realTimeCheckpointInterval duration to be used for the next end 
timestamp
+   * @param cancelQueryOnHeartbeat flag to improve low latency checkpointing
    */
   public ReadChangeStreamPartitionDoFn(
       DaoFactory daoFactory,
       MapperFactory mapperFactory,
       ActionFactory actionFactory,
-      ChangeStreamMetrics metrics) {
+      ChangeStreamMetrics metrics,
+      Duration realTimeCheckpointInterval,
+      boolean cancelQueryOnHeartbeat) {
     this.daoFactory = daoFactory;
-    this.mapperFactory = mapperFactory;
     this.actionFactory = actionFactory;
+    this.mapperFactory = mapperFactory;
     this.metrics = metrics;
     this.isMutableChangeStream = daoFactory.isMutableChangeStream();
+    this.realTimeCheckpointInterval = realTimeCheckpointInterval;
+    this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat;
     this.throughputEstimator = new NullThroughputEstimator<>();
   }
 
@@ -195,7 +204,7 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
     final DataChangeRecordAction dataChangeRecordAction =
         actionFactory.dataChangeRecordAction(throughputEstimator);
     final HeartbeatRecordAction heartbeatRecordAction =
-        actionFactory.heartbeatRecordAction(metrics);
+        actionFactory.heartbeatRecordAction(metrics, cancelQueryOnHeartbeat);
     final ChildPartitionsRecordAction childPartitionsRecordAction =
         actionFactory.childPartitionsRecordAction(partitionMetadataDao, 
metrics);
     final PartitionStartRecordAction partitionStartRecordAction =
@@ -218,7 +227,8 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
             partitionEndRecordAction,
             partitionEventRecordAction,
             metrics,
-            isMutableChangeStream);
+            isMutableChangeStream,
+            realTimeCheckpointInterval);
   }
 
   /**
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
index 56d1825c8a1..adfc4ea35d4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 public class HeartbeatRecordActionTest {
 
   private HeartbeatRecordAction action;
+  private HeartbeatRecordAction cancellingAction;
   private PartitionMetadata partition;
   private RestrictionTracker<TimestampRange, Timestamp> tracker;
   private RestrictionInterrupter<Timestamp> interrupter;
@@ -49,7 +50,8 @@ public class HeartbeatRecordActionTest {
   @Before
   public void setUp() {
     final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
-    action = new HeartbeatRecordAction(metrics);
+    action = new HeartbeatRecordAction(metrics, false);
+    cancellingAction = new HeartbeatRecordAction(metrics, true);
     partition = mock(PartitionMetadata.class);
     tracker = mock(RestrictionTracker.class);
     interrupter = mock(RestrictionInterrupter.class);
@@ -60,6 +62,7 @@ public class HeartbeatRecordActionTest {
   public void testRestrictionClaimed() {
     final String partitionToken = "partitionToken";
     final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
 
     when(tracker.tryClaim(timestamp)).thenReturn(true);
     when(partition.getPartitionToken()).thenReturn(partitionToken);
@@ -70,7 +73,30 @@ public class HeartbeatRecordActionTest {
             new HeartbeatRecord(timestamp, null),
             tracker,
             interrupter,
-            watermarkEstimator);
+            watermarkEstimator,
+            endTimestamp);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(timestamp.toSqlTimestamp().getTime()));
+  }
+
+  @Test
+  public void testRestrictionClaimedOnCancellingAction() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+    when(tracker.tryClaim(timestamp)).thenReturn(true);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        cancellingAction.run(
+            partition,
+            new HeartbeatRecord(timestamp, null),
+            tracker,
+            interrupter,
+            watermarkEstimator,
+            endTimestamp);
 
     assertEquals(Optional.empty(), maybeContinuation);
     verify(watermarkEstimator).setWatermark(new 
Instant(timestamp.toSqlTimestamp().getTime()));
@@ -80,6 +106,7 @@ public class HeartbeatRecordActionTest {
   public void testRestrictionNotClaimed() {
     final String partitionToken = "partitionToken";
     final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
 
     when(tracker.tryClaim(timestamp)).thenReturn(false);
     when(partition.getPartitionToken()).thenReturn(partitionToken);
@@ -90,7 +117,30 @@ public class HeartbeatRecordActionTest {
             new HeartbeatRecord(timestamp, null),
             tracker,
             interrupter,
-            watermarkEstimator);
+            watermarkEstimator,
+            endTimestamp);
+
+    assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
+    verify(watermarkEstimator, never()).setWatermark(any());
+  }
+
+  @Test
+  public void testRestrictionNotClaimedOnCancellingAction() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+    when(tracker.tryClaim(timestamp)).thenReturn(false);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        cancellingAction.run(
+            partition,
+            new HeartbeatRecord(timestamp, null),
+            tracker,
+            interrupter,
+            watermarkEstimator,
+            endTimestamp);
 
     assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
     verify(watermarkEstimator, never()).setWatermark(any());
@@ -100,6 +150,7 @@ public class HeartbeatRecordActionTest {
   public void testSoftDeadlineReached() {
     final String partitionToken = "partitionToken";
     final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
 
     when(interrupter.tryInterrupt(timestamp)).thenReturn(true);
     when(tracker.tryClaim(timestamp)).thenReturn(true);
@@ -111,9 +162,99 @@ public class HeartbeatRecordActionTest {
             new HeartbeatRecord(timestamp, null),
             tracker,
             interrupter,
-            watermarkEstimator);
+            watermarkEstimator,
+            endTimestamp);
+
+    assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
+    verify(watermarkEstimator, never()).setWatermark(any());
+  }
+
+  @Test
+  public void testSoftDeadlineReachedOnCancellingAction() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+    when(interrupter.tryInterrupt(timestamp)).thenReturn(true);
+    when(tracker.tryClaim(timestamp)).thenReturn(true);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        cancellingAction.run(
+            partition,
+            new HeartbeatRecord(timestamp, null),
+            tracker,
+            interrupter,
+            watermarkEstimator,
+            endTimestamp);
 
     assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
     verify(watermarkEstimator, never()).setWatermark(any());
   }
+
+  @Test
+  public void testEndTimestampReachedOnCancellingAction() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(10L);
+
+    when(tracker.tryClaim(timestamp)).thenReturn(true);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        cancellingAction.run(
+            partition,
+            new HeartbeatRecord(timestamp, null),
+            tracker,
+            interrupter,
+            watermarkEstimator,
+            endTimestamp);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(timestamp.toSqlTimestamp().getTime()));
+  }
+
+  @Test
+  public void testEndTimestampNotReachedOnCancellingAction() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+
+    when(tracker.tryClaim(timestamp)).thenReturn(true);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        cancellingAction.run(
+            partition,
+            new HeartbeatRecord(timestamp, null),
+            tracker,
+            interrupter,
+            watermarkEstimator,
+            endTimestamp);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(timestamp.toSqlTimestamp().getTime()));
+  }
+
+  @Test
+  public void testEndTimestampNotReachedOnAction() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+
+    when(tracker.tryClaim(timestamp)).thenReturn(true);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(
+            partition,
+            new HeartbeatRecord(timestamp, null),
+            tracker,
+            interrupter,
+            watermarkEstimator,
+            endTimestamp);
+
+    assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(timestamp.toSqlTimestamp().getTime()));
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 26ab41dff87..7c5d6d0f187 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -58,6 +58,7 @@ import 
org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
@@ -119,7 +120,8 @@ public class QueryChangeStreamActionTest {
             partitionEndRecordAction,
             partitionEventRecordAction,
             metrics,
-            false);
+            false,
+            Duration.standardMinutes(2));
     final Struct row = mock(Struct.class);
     partition =
         PartitionMetadata.newBuilder()
@@ -223,7 +225,7 @@ public class QueryChangeStreamActionTest {
             eq(watermarkEstimator));
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
@@ -257,14 +259,16 @@ public class QueryChangeStreamActionTest {
             eq(record1),
             eq(restrictionTracker),
             any(RestrictionInterrupter.class),
-            eq(watermarkEstimator)))
+            eq(watermarkEstimator),
+            eq(PARTITION_END_TIMESTAMP)))
         .thenReturn(Optional.empty());
     when(heartbeatRecordAction.run(
             eq(partition),
             eq(record2),
             eq(restrictionTracker),
             any(RestrictionInterrupter.class),
-            eq(watermarkEstimator)))
+            eq(watermarkEstimator),
+            eq(PARTITION_END_TIMESTAMP)))
         .thenReturn(Optional.of(ProcessContinuation.stop()));
     when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
 
@@ -279,14 +283,80 @@ public class QueryChangeStreamActionTest {
             eq(record1),
             eq(restrictionTracker),
             any(RestrictionInterrupter.class),
-            eq(watermarkEstimator));
+            eq(watermarkEstimator),
+            eq(PARTITION_END_TIMESTAMP));
     verify(heartbeatRecordAction)
         .run(
             eq(partition),
             eq(record2),
             eq(restrictionTracker),
             any(RestrictionInterrupter.class),
-            eq(watermarkEstimator));
+            eq(watermarkEstimator),
+            eq(PARTITION_END_TIMESTAMP));
+    verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
+    verify(restrictionTracker, never()).tryClaim(any());
+  }
+
+  @Test
+  public void testQueryChangeStreamWithHeartbeatRecordAndCancelOnHeartbeat() {
+    final Struct rowAsStruct = mock(Struct.class);
+    final ChangeStreamResultSetMetadata resultSetMetadata =
+        mock(ChangeStreamResultSetMetadata.class);
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final HeartbeatRecord record1 = mock(HeartbeatRecord.class);
+    final HeartbeatRecord record2 = mock(HeartbeatRecord.class);
+    when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
+    when(record2.getRecordTimestamp()).thenReturn(PARTITION_END_TIMESTAMP);
+    when(changeStreamDao.changeStreamQuery(
+            PARTITION_TOKEN,
+            PARTITION_START_TIMESTAMP,
+            PARTITION_END_TIMESTAMP,
+            PARTITION_HEARTBEAT_MILLIS))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+    when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+    when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, 
resultSetMetadata))
+        .thenReturn(Arrays.asList(record1, record2));
+    when(heartbeatRecordAction.run(
+            eq(partition),
+            eq(record1),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator),
+            eq(PARTITION_END_TIMESTAMP)))
+        .thenReturn(Optional.of(ProcessContinuation.resume()));
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.resume(), result);
+    verify(heartbeatRecordAction)
+        .run(
+            eq(partition),
+            eq(record1),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator),
+            eq(PARTITION_END_TIMESTAMP));
+
+    // Heartbeat cancels loop and second record is not processed
+    verify(heartbeatRecordAction, never())
+        .run(
+            eq(partition),
+            eq(record2),
+            eq(restrictionTracker),
+            any(RestrictionInterrupter.class),
+            eq(watermarkEstimator),
+            eq(PARTITION_END_TIMESTAMP));
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
@@ -356,7 +426,7 @@ public class QueryChangeStreamActionTest {
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
@@ -419,7 +489,7 @@ public class QueryChangeStreamActionTest {
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
@@ -467,7 +537,7 @@ public class QueryChangeStreamActionTest {
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(restrictionTracker, never()).tryClaim(any());
   }
@@ -517,7 +587,7 @@ public class QueryChangeStreamActionTest {
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(restrictionTracker, never()).tryClaim(any());
   }
@@ -564,7 +634,7 @@ public class QueryChangeStreamActionTest {
     verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
@@ -617,7 +687,7 @@ public class QueryChangeStreamActionTest {
     verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
@@ -665,7 +735,7 @@ public class QueryChangeStreamActionTest {
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
@@ -694,7 +764,7 @@ public class QueryChangeStreamActionTest {
     verify(metrics).decActivePartitionReadCounter();
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
@@ -740,7 +810,7 @@ public class QueryChangeStreamActionTest {
     verify(metrics, never()).decActivePartitionReadCounter();
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
@@ -776,7 +846,7 @@ public class QueryChangeStreamActionTest {
     verify(metrics).decActivePartitionReadCounter();
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
@@ -809,7 +879,7 @@ public class QueryChangeStreamActionTest {
     verify(metrics).decActivePartitionReadCounter();
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
@@ -859,7 +929,7 @@ public class QueryChangeStreamActionTest {
     verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
@@ -912,7 +982,7 @@ public class QueryChangeStreamActionTest {
     verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(partitionEventRecordAction, never()).run(any(), any(), any(), 
any(), any());
@@ -935,7 +1005,8 @@ public class QueryChangeStreamActionTest {
             partitionEndRecordAction,
             partitionEventRecordAction,
             metrics,
-            true);
+            true,
+            Duration.standardMinutes(2));
 
     // Set endTimestamp to 60 minutes in the future
     Timestamp now = Timestamp.now();
@@ -983,7 +1054,8 @@ public class QueryChangeStreamActionTest {
             partitionEndRecordAction,
             partitionEventRecordAction,
             metrics,
-            true);
+            true,
+            Duration.standardMinutes(2));
 
     // Set endTimestamp to only 10 seconds in the future
     Timestamp now = Timestamp.now();
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java
index 9672e23b16d..c3bee10f8e1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFnTest.java
@@ -62,7 +62,8 @@ public class InitializeDoFnTest {
             daoFactory,
             mapperFactory,
             Timestamp.ofTimeMicroseconds(1L),
-            Timestamp.ofTimeMicroseconds(2L));
+            Timestamp.ofTimeMicroseconds(2L),
+            2000L);
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 9e588de77a0..9a783f5de31 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -53,6 +53,7 @@ import 
org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,6 +68,7 @@ public class ReadChangeStreamPartitionDoFnTest {
       Timestamp.ofTimeSecondsAndNanos(10, 20);
   private static final Timestamp PARTITION_END_TIMESTAMP = 
Timestamp.ofTimeSecondsAndNanos(30, 40);
   private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L;
+  private static final boolean CANCEL_QUERY_ON_HEARTBEAT = true;
 
   private ReadChangeStreamPartitionDoFn doFn;
   private PartitionMetadata partition;
@@ -103,7 +105,14 @@ public class ReadChangeStreamPartitionDoFnTest {
     partitionEventRecordAction = mock(PartitionEventRecordAction.class);
     queryChangeStreamAction = mock(QueryChangeStreamAction.class);
 
-    doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, 
actionFactory, metrics);
+    doFn =
+        new ReadChangeStreamPartitionDoFn(
+            daoFactory,
+            mapperFactory,
+            actionFactory,
+            metrics,
+            Duration.standardMinutes(2),
+            CANCEL_QUERY_ON_HEARTBEAT);
     doFn.setThroughputEstimator(throughputEstimator);
 
     partition =
@@ -131,7 +140,8 @@ public class ReadChangeStreamPartitionDoFnTest {
 
     when(actionFactory.dataChangeRecordAction(throughputEstimator))
         .thenReturn(dataChangeRecordAction);
-    
when(actionFactory.heartbeatRecordAction(metrics)).thenReturn(heartbeatRecordAction);
+    when(actionFactory.heartbeatRecordAction(metrics, 
CANCEL_QUERY_ON_HEARTBEAT))
+        .thenReturn(heartbeatRecordAction);
     when(actionFactory.childPartitionsRecordAction(partitionMetadataDao, 
metrics))
         .thenReturn(childPartitionsRecordAction);
     when(actionFactory.partitionStartRecordAction(partitionMetadataDao, 
metrics))
@@ -152,7 +162,8 @@ public class ReadChangeStreamPartitionDoFnTest {
             eq(partitionEndRecordAction),
             eq(partitionEventRecordAction),
             eq(metrics),
-            anyBoolean()))
+            anyBoolean(),
+            eq(Duration.standardMinutes(2))))
         .thenReturn(queryChangeStreamAction);
 
     doFn.setup();
@@ -171,7 +182,7 @@ public class ReadChangeStreamPartitionDoFnTest {
         .run(partition, tracker, receiver, watermarkEstimator, 
bundleFinalizer);
 
     verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
-    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionStartRecordAction, never()).run(any(), any(), any(), 
any(), any());
     verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), 
any());

Reply via email to