This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c074b0f  [BEAM-12164]: Add SDF for reading change stream records
     new f43789a  Merge pull request #16514 from [BEAM-12164]: Add SDF for 
reading change stream records
c074b0f is described below

commit c074b0f7535e7f12b467864ca9441226f06dd57e
Author: Thiago Nunes <thiagotnu...@google.com>
AuthorDate: Fri Jan 14 15:16:13 2022 +1100

    [BEAM-12164]: Add SDF for reading change stream records
    
    Adds ReadChangeStreamPartitionDoFn, which is an SDF to read partitions
    from change streams and process them accordingly. This component
    receives a change stream name, a partition, a start time and an end time
    to query. It then initiates a change stream query with the received
    parameters.
    
    Within a change stream, 3 types of records can be received:
    
    1. A Data record
    2. A Heartbeat record
    3. A Child partitions record
    
    Upon receiving #1, the function updates the watermark with the record's
    commit timestamp and emits the record into the output PCollection.
    Upon receiving #2, the function updates the watermark with the record's
    timestamp, but it does not emit any record into the PCollection.
    Upon receiving #3, the function updates the watermark with the record's
    timestamp and writes the new child partitions into the metadata table.
    These partitions will be later scheduled by the DetectNewPartitions
    component.
    
    Once the change stream query for the element partition finishes, it
    marks the partition as finished in the metadata table and terminates.
---
 .../spanner/changestreams/TimestampConverter.java  |  64 ++++
 .../changestreams/action/ActionFactory.java        | 131 ++++++++
 .../action/ChildPartitionsRecordAction.java        | 230 ++++++++++++++
 .../action/DataChangeRecordAction.java             | 115 +++++++
 .../action/HeartbeatRecordAction.java              | 110 +++++++
 .../action/QueryChangeStreamAction.java            | 299 ++++++++++++++++++
 .../spanner/changestreams/action/package-info.java |  23 ++
 .../dofn/ReadChangeStreamPartitionDoFn.java        | 230 ++++++++++++++
 .../ReadChangeStreamPartitionRangeTracker.java     |  80 +++++
 .../changestreams/restriction/package-info.java    |  23 ++
 .../changestreams/TimestampConverterTest.java      |  69 +++++
 .../action/ChildPartitionsRecordActionTest.java    | 244 +++++++++++++++
 .../action/DataChangeRecordActionTest.java         |  91 ++++++
 .../action/HeartbeatRecordActionTest.java          |  85 ++++++
 .../action/QueryChangeStreamActionTest.java        | 337 +++++++++++++++++++++
 .../dofn/ReadChangeStreamPartitionDoFnTest.java    | 171 +++++++++++
 .../ReadChangeStreamPartitionRangeTrackerTest.java |  59 ++++
 .../changestreams/util/TestTransactionAnswer.java  |  42 +++
 18 files changed, 2403 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverter.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverter.java
new file mode 100644
index 0000000..ab7b9e1
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import com.google.cloud.Timestamp;
+import java.math.BigDecimal;
+
+/** Util class to manage timestamp conversions. */
+public class TimestampConverter {
+
+  /** The number of microseconds in a {@link Timestamp#MAX_VALUE}. */
+  public static final long MAX_MICROS = timestampToMicros(Timestamp.MAX_VALUE);
+
+  /**
+   * Converts a {@link Timestamp} to its number of microseconds. Note there is 
precision loss here.
+   *
+   * @param timestamp the timestamp to be converted
+   * @return the number of microseconds in the given timestamp
+   */
+  public static long timestampToMicros(Timestamp timestamp) {
+    final BigDecimal seconds = BigDecimal.valueOf(timestamp.getSeconds());
+    final BigDecimal nanos = BigDecimal.valueOf(timestamp.getNanos());
+    final BigDecimal micros = nanos.scaleByPowerOfTen(-3);
+
+    return seconds.scaleByPowerOfTen(6).add(micros).longValue();
+  }
+
+  /**
+   * Creates a {@link Timestamp} from a number of milliseconds. Note that 
microseconds and
+   * nanoseconds will always be zeroed here.
+   *
+   * @param millis the number of milliseconds
+   * @return a timestamp with the given milliseconds
+   */
+  public static Timestamp timestampFromMillis(long millis) {
+    return Timestamp.ofTimeMicroseconds(millis * 1_000L);
+  }
+
+  /**
+   * Zeroes nanoseconds from the given {@link Timestamp} (precision is lost). 
The timestamp returned
+   * will be precise up to microseconds only.
+   *
+   * @param timestamp the timestamp to be truncated
+   * @return the timestamp with microseconds precision
+   */
+  public static Timestamp truncateNanos(Timestamp timestamp) {
+    return Timestamp.ofTimeMicroseconds(timestampToMicros(timestamp));
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
new file mode 100644
index 0000000..aa45411
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+
+/**
+ * Factory class for creating instances that will handle each type of record 
within a change stream
+ * query. The instances created are all singletons.
+ */
+// static fields are un-initialized, because we start them during the first 
fetch call (with the
+// singleton pattern)
+@SuppressWarnings("initialization.static.fields.uninitialized")
+public class ActionFactory implements Serializable {
+
+  private static final long serialVersionUID = -4060958761369602619L;
+  private static DataChangeRecordAction dataChangeRecordActionInstance;
+  private static HeartbeatRecordAction heartbeatRecordActionInstance;
+  private static ChildPartitionsRecordAction 
childPartitionsRecordActionInstance;
+  private static QueryChangeStreamAction queryChangeStreamActionInstance;
+
+  /**
+   * Creates and returns a singleton instance of an action class capable of 
processing {@link
+   * org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord}s.
+   *
+   * <p>This method is thread safe.
+   *
+   * @return singleton instance of the {@link DataChangeRecordAction}
+   */
+  public synchronized DataChangeRecordAction dataChangeRecordAction() {
+    if (dataChangeRecordActionInstance == null) {
+      dataChangeRecordActionInstance = new DataChangeRecordAction();
+    }
+    return dataChangeRecordActionInstance;
+  }
+
+  /**
+   * Creates and returns a singleton instance of an action class capable of 
processing {@link
+   * org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord}s. 
This method is thread
+   * safe.
+   *
+   * @param metrics metrics gathering class
+   * @return singleton instance of the {@link HeartbeatRecordAction}
+   */
+  public synchronized HeartbeatRecordAction 
heartbeatRecordAction(ChangeStreamMetrics metrics) {
+    if (heartbeatRecordActionInstance == null) {
+      heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics);
+    }
+    return heartbeatRecordActionInstance;
+  }
+
+  /**
+   * Creates and returns a singleton instance of an action class capable of 
process {@link
+   * 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s. 
This method is
+   * thread safe.
+   *
+   * @param partitionMetadataDao DAO class to access the Connector's metadata 
tables
+   * @param metrics metrics gathering class
+   * @return singleton instance of the {@link ChildPartitionsRecordAction}
+   */
+  public synchronized ChildPartitionsRecordAction childPartitionsRecordAction(
+      PartitionMetadataDao partitionMetadataDao, ChangeStreamMetrics metrics) {
+    if (childPartitionsRecordActionInstance == null) {
+      childPartitionsRecordActionInstance =
+          new ChildPartitionsRecordAction(partitionMetadataDao, metrics);
+    }
+    return childPartitionsRecordActionInstance;
+  }
+
+  /**
+   * Creates and returns a single instance of an action class capable of 
performing a change stream
+   * query for a given partition. It uses the {@link DataChangeRecordAction}, 
{@link
+   * HeartbeatRecordAction} and {@link ChildPartitionsRecordAction} to 
dispatch the necessary
+   * processing depending on the type of record received.
+   *
+   * @param changeStreamDao DAO class to perform a change stream query
+   * @param partitionMetadataDao DAO class to access the Connector's metadata 
tables
+   * @param changeStreamRecordMapper mapper class to transform change stream 
records into the
+   *     Connector's domain models
+   * @param partitionMetadataMapper mapper class to transform partition 
metadata rows into the
+   *     Connector's domain models
+   * @param dataChangeRecordAction action class to process {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord}s
+   * @param heartbeatRecordAction action class to process {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord}s
+   * @param childPartitionsRecordAction action class to process {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s
+   * @return single instance of the {@link QueryChangeStreamAction}
+   */
+  public synchronized QueryChangeStreamAction queryChangeStreamAction(
+      ChangeStreamDao changeStreamDao,
+      PartitionMetadataDao partitionMetadataDao,
+      ChangeStreamRecordMapper changeStreamRecordMapper,
+      PartitionMetadataMapper partitionMetadataMapper,
+      DataChangeRecordAction dataChangeRecordAction,
+      HeartbeatRecordAction heartbeatRecordAction,
+      ChildPartitionsRecordAction childPartitionsRecordAction) {
+    if (queryChangeStreamActionInstance == null) {
+      queryChangeStreamActionInstance =
+          new QueryChangeStreamAction(
+              changeStreamDao,
+              partitionMetadataDao,
+              changeStreamRecordMapper,
+              partitionMetadataMapper,
+              dataChangeRecordAction,
+              heartbeatRecordAction,
+              childPartitionsRecordAction);
+    }
+    return queryChangeStreamActionInstance;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
new file mode 100644
index 0000000..241597b
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED;
+
+import com.google.cloud.Timestamp;
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is part of the process for {@link
+ * 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF. It is
+ * responsible for processing {@link ChildPartitionsRecord}s. The new child 
partitions will be
+ * stored in the Connector's metadata tables in order to be scheduled for 
future querying by the
+ * {@link 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn} 
SDF.
+ */
+public class ChildPartitionsRecordAction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChildPartitionsRecordAction.class);
+  private static final Tracer TRACER = Tracing.getTracer();
+  private final PartitionMetadataDao partitionMetadataDao;
+  private final ChangeStreamMetrics metrics;
+
+  /**
+   * Constructs an action class for handling {@link ChildPartitionsRecord}s.
+   *
+   * @param partitionMetadataDao DAO class to access the Connector's metadata 
tables
+   * @param metrics metrics gathering class
+   */
+  ChildPartitionsRecordAction(
+      PartitionMetadataDao partitionMetadataDao, ChangeStreamMetrics metrics) {
+    this.partitionMetadataDao = partitionMetadataDao;
+    this.metrics = metrics;
+  }
+
+  /**
+   * This is the main processing function for a {@link ChildPartitionsRecord}. 
It returns an {@link
+   * Optional} of {@link ProcessContinuation} to indicate if the calling 
function should stop or
+   * not. If the {@link Optional} returned is empty, it means that the calling 
function can continue
+   * with the processing. If an {@link Optional} of {@link 
ProcessContinuation#stop()} is returned,
+   * it means that this function was unable to claim the timestamp of the 
{@link
+   * ChildPartitionsRecord}, so the caller should stop.
+   *
+   * <p>When processing the {@link ChildPartitionsRecord} the following 
procedure is applied:
+   *
+   * <ol>
+   *   <li>We try to claim the child partition record timestamp. If it is not 
possible, we stop here
+   *       and return.
+   *   <li>We update the watermark to the child partition record timestamp.
+   *   <li>For each child partition, we try to insert them in the metadata 
tables if they do not
+   *       exist.
+   *   <li>For each child partition, we check if they originate from a split 
or a merge and
+   *       increment the corresponding metric.
+   * </ol>
+   *
+   * Dealing with partition splits and merge cases is detailed below:
+   *
+   * <ul>
+   *   <li>Partition Splits: child partition tokens should not exist in the 
partition metadata
+   *       table, so new rows are just added to such table. In case of a 
bundle retry, we silently
+   *       ignore duplicate entries.
+   *   <li>Partition Merges: the first parent partition that receives the 
child token should succeed
+   *       in inserting it. The remaining parents will silently ignore and 
skip the insertion.
+   * </ul>
+   *
+   * @param partition the current partition being processed
+   * @param record the change stream child partition record received
+   * @param tracker the restriction tracker of the {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   * @param watermarkEstimator the watermark estimator of the {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   * @return {@link Optional#empty()} if the caller can continue processing 
more records. A non
+   *     empty {@link Optional} with {@link ProcessContinuation#stop()} if 
this function was unable
+   *     to claim the {@link ChildPartitionsRecord} timestamp
+   */
+  @VisibleForTesting
+  public Optional<ProcessContinuation> run(
+      PartitionMetadata partition,
+      ChildPartitionsRecord record,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      ManualWatermarkEstimator<Instant> watermarkEstimator) {
+
+    final String token = partition.getPartitionToken();
+    try (Scope scope =
+        
TRACER.spanBuilder("ChildPartitionsRecordAction").setRecordEvents(true).startScopedSpan())
 {
+      TRACER
+          .getCurrentSpan()
+          .putAttribute(PARTITION_ID_ATTRIBUTE_LABEL, 
AttributeValue.stringAttributeValue(token));
+
+      LOG.debug("[" + token + "] Processing child partition record " + record);
+
+      final Timestamp startTimestamp = record.getStartTimestamp();
+      final Instant startInstant = new 
Instant(startTimestamp.toSqlTimestamp().getTime());
+      final long startMicros = 
TimestampConverter.timestampToMicros(startTimestamp);
+      if (!tracker.tryClaim(startMicros)) {
+        LOG.debug(
+            "[" + token + "] Could not claim queryChangeStream(" + 
startTimestamp + "), stopping");
+        return Optional.of(ProcessContinuation.stop());
+      }
+      watermarkEstimator.setWatermark(startInstant);
+
+      for (ChildPartition childPartition : record.getChildPartitions()) {
+        processChildPartition(partition, record, childPartition);
+      }
+
+      LOG.debug("[" + token + "] Child partitions action completed 
successfully");
+      return Optional.empty();
+    }
+  }
+
+  // Unboxing of runInTransaction result will not produce a null value, we can 
ignore it
+  @SuppressWarnings("nullness")
+  private void processChildPartition(
+      PartitionMetadata partition, ChildPartitionsRecord record, 
ChildPartition childPartition) {
+
+    try (Scope scope =
+        TRACER
+            .spanBuilder("ChildPartitionsRecordAction.processChildPartition")
+            .setRecordEvents(true)
+            .startScopedSpan()) {
+      TRACER
+          .getCurrentSpan()
+          .putAttribute(
+              PARTITION_ID_ATTRIBUTE_LABEL,
+              
AttributeValue.stringAttributeValue(partition.getPartitionToken()));
+
+      final String partitionToken = partition.getPartitionToken();
+      final String childPartitionToken = childPartition.getToken();
+      final boolean isSplit = isSplit(childPartition);
+      LOG.debug(
+          "["
+              + partitionToken
+              + "] Processing child partition"
+              + (isSplit ? " split" : " merge")
+              + " event");
+
+      final PartitionMetadata row =
+          toPartitionMetadata(
+              record.getStartTimestamp(),
+              partition.getEndTimestamp(),
+              partition.getHeartbeatMillis(),
+              childPartition);
+      // FIXME: Figure out what to do if this throws an exception
+      LOG.debug("[" + partitionToken + "] Inserting child partition token " + 
childPartitionToken);
+      final Boolean insertedRow =
+          partitionMetadataDao
+              .runInTransaction(
+                  transaction -> {
+                    if (transaction.getPartition(childPartitionToken) == null) 
{
+                      transaction.insert(row);
+                      return true;
+                    } else {
+                      return false;
+                    }
+                  })
+              .getResult();
+      if (insertedRow && isSplit) {
+        metrics.incPartitionRecordSplitCount();
+      } else if (insertedRow) {
+        metrics.incPartitionRecordMergeCount();
+      } else {
+        LOG.debug(
+            "["
+                + partitionToken
+                + "] Child token "
+                + childPartitionToken
+                + " already exists, skipping...");
+      }
+    }
+  }
+
+  private boolean isSplit(ChildPartition childPartition) {
+    return childPartition.getParentTokens().size() == 1;
+  }
+
+  private PartitionMetadata toPartitionMetadata(
+      Timestamp startTimestamp,
+      @Nullable Timestamp endTimestamp,
+      long heartbeatMillis,
+      ChildPartition childPartition) {
+    // FIXME: The backend only supports microsecond granularity. Remove when 
fixed.
+    final Timestamp truncatedStartTimestamp = 
TimestampConverter.truncateNanos(startTimestamp);
+    final Timestamp truncatedEndTimestamp =
+        
Optional.ofNullable(endTimestamp).map(TimestampConverter::truncateNanos).orElse(null);
+    return PartitionMetadata.newBuilder()
+        .setPartitionToken(childPartition.getToken())
+        .setParentTokens(childPartition.getParentTokens())
+        .setStartTimestamp(truncatedStartTimestamp)
+        .setEndTimestamp(truncatedEndTimestamp)
+        .setHeartbeatMillis(heartbeatMillis)
+        .setState(CREATED)
+        .setWatermark(truncatedStartTimestamp)
+        .build();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
new file mode 100644
index 0000000..a898729
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
+
+import com.google.cloud.Timestamp;
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is part of the process for {@link
+ * 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF. It is
+ * responsible for processing {@link DataChangeRecord}s. The records will 
simply be emitted to the
+ * received output receiver.
+ */
+public class DataChangeRecordAction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataChangeRecordAction.class);
+  private static final Tracer TRACER = Tracing.getTracer();
+
+  /**
+   * This is the main processing function for a {@link DataChangeRecord}. It 
returns an {@link
+   * Optional} of {@link ProcessContinuation} to indicate if the calling 
function should stop or
+   * not. If the {@link Optional} returned is empty, it means that the calling 
function can continue
+   * with the processing. If an {@link Optional} of {@link 
ProcessContinuation#stop()} is returned,
+   * it means that this function was unable to claim the timestamp of the 
{@link DataChangeRecord},
+   * so the caller should stop.
+   *
+   * <p>When processing the {@link DataChangeRecord} the following procedure 
is applied:
+   *
+   * <ol>
+   *   <li>We try to cliam the data change record commit timestamp. If it is 
not possible, we stop
+   *       here and return.
+   *   <li>We emit the data change record through the {@link OutputReceiver}.
+   *   <li>We update the watermark to the data change record commit timestamp.
+   * </ol>
+   *
+   * @param partition the current partition being processed
+   * @param record the change stream data record received
+   * @param tracker the restriction tracker of the {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   * @param outputReceiver the output receiver of the {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   * @param watermarkEstimator the watermark estimator of the {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   * @return {@link Optional#empty()} if the caller can continue processing 
more records. A non
+   *     empty {@link Optional} with {@link ProcessContinuation#stop()} if 
this function was unable
+   *     to claim the {@link ChildPartitionsRecord} timestamp
+   */
+  @VisibleForTesting
+  public Optional<ProcessContinuation> run(
+      PartitionMetadata partition,
+      DataChangeRecord record,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      OutputReceiver<DataChangeRecord> outputReceiver,
+      ManualWatermarkEstimator<Instant> watermarkEstimator) {
+
+    try (Scope scope =
+        
TRACER.spanBuilder("DataChangeRecordAction").setRecordEvents(true).startScopedSpan())
 {
+      TRACER
+          .getCurrentSpan()
+          .putAttribute(
+              PARTITION_ID_ATTRIBUTE_LABEL,
+              
AttributeValue.stringAttributeValue(partition.getPartitionToken()));
+
+      final String token = partition.getPartitionToken();
+      LOG.debug("[" + token + "] Processing data record " + 
record.getCommitTimestamp());
+
+      final Timestamp commitTimestamp = record.getCommitTimestamp();
+      final Instant commitInstant = new 
Instant(commitTimestamp.toSqlTimestamp().getTime());
+      final long commitMicros = 
TimestampConverter.timestampToMicros(commitTimestamp);
+      if (!tracker.tryClaim(commitMicros)) {
+        LOG.debug(
+            "[" + token + "] Could not claim queryChangeStream(" + 
commitTimestamp + "), stopping");
+        return Optional.of(ProcessContinuation.stop());
+      }
+      outputReceiver.outputWithTimestamp(record, commitInstant);
+      watermarkEstimator.setWatermark(commitInstant);
+
+      LOG.debug("[" + token + "] Data record action completed successfully");
+      return Optional.empty();
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
new file mode 100644
index 0000000..7c7cce1
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
+
+import com.google.cloud.Timestamp;
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is part of the process for {@link
+ * 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF. It is
+ * responsible for processing {@link HeartbeatRecord}s. The records will be 
used to progress the
+ * watermark for the current element (partition).
+ */
+public class HeartbeatRecordAction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatRecordAction.class);
+  private static final Tracer TRACER = Tracing.getTracer();
+  private final ChangeStreamMetrics metrics;
+
+  /**
+   * Constructs an action class for handling {@link HeartbeatRecord}s.
+   *
+   * @param metrics metrics gathering class
+   */
+  HeartbeatRecordAction(ChangeStreamMetrics metrics) {
+    this.metrics = metrics;
+  }
+
+  /**
+   * This is the main processing function for a {@link HeartbeatRecord}. It 
returns an {@link
+   * Optional} of {@link ProcessContinuation} to indicate if the calling 
function should stop or
+   * not. If the {@link Optional} returned is empty, it means that the calling 
function can continue
+   * with the processing. If an {@link Optional} of {@link 
ProcessContinuation#stop()} is returned,
+   * it means that this function was unable to claim the timestamp of the 
{@link HeartbeatRecord},
+   * so the caller should stop.
+   *
+   * <p>When processing the {@link HeartbeatRecord} the following procedure is 
applied:
+   *
+   * <ol>
+   *   <li>We try to claim the heartbeat record timestamp. If it is not 
possible, we stop here and
+   *       return.
+   *   <li>We update the necessary metrics.
+   *   <li>We update the watermark to the heartbeat record timestamp.
+   * </ol>
+   */
+  @VisibleForTesting
+  public Optional<ProcessContinuation> run(
+      PartitionMetadata partition,
+      HeartbeatRecord record,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      ManualWatermarkEstimator<Instant> watermarkEstimator) {
+
+    try (Scope scope =
+        
TRACER.spanBuilder("HeartbeatRecordAction").setRecordEvents(true).startScopedSpan())
 {
+      TRACER
+          .getCurrentSpan()
+          .putAttribute(
+              PARTITION_ID_ATTRIBUTE_LABEL,
+              
AttributeValue.stringAttributeValue(partition.getPartitionToken()));
+
+      final String token = partition.getPartitionToken();
+      LOG.debug("[" + token + "] Processing heartbeat record " + record);
+
+      final Timestamp timestamp = record.getTimestamp();
+      final Instant timestampInstant = new 
Instant(timestamp.toSqlTimestamp().getTime());
+      final long timestampMicros = 
TimestampConverter.timestampToMicros(timestamp);
+      if (!tracker.tryClaim(timestampMicros)) {
+        LOG.debug("[" + token + "] Could not claim queryChangeStream(" + 
timestamp + "), stopping");
+        return Optional.of(ProcessContinuation.stop());
+      }
+      metrics.incHeartbeatRecordCount();
+      watermarkEstimator.setWatermark(timestampInstant);
+
+      LOG.debug("[" + token + "] Heartbeat record action completed 
successfully");
+      return Optional.empty();
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
new file mode 100644
index 0000000..5360638
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.SpannerException;
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main action class for querying a partition change stream. This class will 
perform the change
+ * stream query and depending on the record type received, it will dispatch 
the processing of it to
+ * one of the following: {@link ChildPartitionsRecordAction}, {@link 
HeartbeatRecordAction} or
+ * {@link DataChangeRecordAction}.
+ *
+ * <p>This class will also make sure to mirror the current watermark (event 
timestamp processed) in
+ * the Connector's metadata tables, by registering a bundle after commit 
action.
+ *
+ * <p>When the change stream query for the partition is finished, this class 
will update the state
+ * of the partition in the metadata tables as FINISHED, indicating completion.
+ */
+public class QueryChangeStreamAction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(QueryChangeStreamAction.class);
+  private static final Tracer TRACER = Tracing.getTracer();
+  private static final Duration BUNDLE_FINALIZER_TIMEOUT = 
Duration.standardMinutes(5);
+  private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified 
start_timestamp is invalid";
+
+  private final ChangeStreamDao changeStreamDao;
+  private final PartitionMetadataDao partitionMetadataDao;
+  private final ChangeStreamRecordMapper changeStreamRecordMapper;
+  private final PartitionMetadataMapper partitionMetadataMapper;
+  private final DataChangeRecordAction dataChangeRecordAction;
+  private final HeartbeatRecordAction heartbeatRecordAction;
+  private final ChildPartitionsRecordAction childPartitionsRecordAction;
+
+  /**
+   * Constructs an action class for performing a change stream query for a 
given partition.
+   *
+   * @param changeStreamDao DAO class to perform a change stream query
+   * @param partitionMetadataDao DAO class to access the Connector's metadata 
tables
+   * @param changeStreamRecordMapper mapper class to transform change stream 
records into the
+   *     Connector's domain models
+   * @param partitionMetadataMapper mapper class to transform partition 
metadata rows into the
+   *     Connector's domain models
+   * @param dataChangeRecordAction action class to process {@link 
DataChangeRecord}s
+   * @param heartbeatRecordAction action class to process {@link 
HeartbeatRecord}s
+   * @param childPartitionsRecordAction action class to process {@link 
ChildPartitionsRecord}s
+   */
+  QueryChangeStreamAction(
+      ChangeStreamDao changeStreamDao,
+      PartitionMetadataDao partitionMetadataDao,
+      ChangeStreamRecordMapper changeStreamRecordMapper,
+      PartitionMetadataMapper partitionMetadataMapper,
+      DataChangeRecordAction dataChangeRecordAction,
+      HeartbeatRecordAction heartbeatRecordAction,
+      ChildPartitionsRecordAction childPartitionsRecordAction) {
+    this.changeStreamDao = changeStreamDao;
+    this.partitionMetadataDao = partitionMetadataDao;
+    this.changeStreamRecordMapper = changeStreamRecordMapper;
+    this.partitionMetadataMapper = partitionMetadataMapper;
+    this.dataChangeRecordAction = dataChangeRecordAction;
+    this.heartbeatRecordAction = heartbeatRecordAction;
+    this.childPartitionsRecordAction = childPartitionsRecordAction;
+  }
+
+  /**
+   * This method will dispatch a change stream query for the given partition, 
it delegate the
+   * processing of the records to one of the corresponding action classes 
registered and it will
+   * keep the state of the partition up to date in the Connector's metadata 
table.
+   *
+   * <p>The algorithm is as follows:
+   *
+   * <ol>
+   *   <li>A change stream query for the partition is performed.
+   *   <li>For each record, we check the type of the record and dispatch the 
processing to one of
+   *       the actions registered.
+   *   <li>If an {@link Optional} with a {@link ProcessContinuation#stop()} is 
returned from the
+   *       actions, we stop processing and return.
+   *   <li>Before returning we register a bundle finalizer callback to update 
the watermark of the
+   *       partition in the metadata tables to the latest processed timestamp.
+   *   <li>When a change stream query finishes successfully (no more records) 
we update the
+   *       partition state to FINISHED.
+   * </ol>
+   *
+   * There might be cases where due to a split at the exact end timestamp of a 
partition's change
+   * stream query, this function could process a residual with an invalid 
timestamp. In this case,
+   * the error is ignored and no work is done for the residual.
+   *
+   * @param partition the current partition being processed
+   * @param tracker the restriction tracker of the {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   * @param receiver the output receiver of the {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   * @param watermarkEstimator the watermark estimator of the {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   * @param bundleFinalizer the bundle finalizer for {@link
+   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
+   *     bundles
+   * @return a {@link ProcessContinuation#stop()} if a record timestamp could 
not be claimed or if
+   *     the partition processing has finished
+   */
+  @SuppressWarnings("nullness")
+  @VisibleForTesting
+  public ProcessContinuation run(
+      PartitionMetadata partition,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      OutputReceiver<DataChangeRecord> receiver,
+      ManualWatermarkEstimator<Instant> watermarkEstimator,
+      BundleFinalizer bundleFinalizer) {
+    final String token = partition.getPartitionToken();
+    final Timestamp endTimestamp = partition.getEndTimestamp();
+
+    /*
+     * FIXME(b/202802422): Workaround until the backend is fixed.
+     * The change stream API returns invalid argument if we try to use a child 
partition start
+     * timestamp for a previously returned query. If we split at that exact 
time, we won't be able
+     * to obtain the child partition on the residual restriction, since it 
will start at the child
+     * partition start time.
+     * To circumvent this, we always start querying one microsecond before the 
restriction start
+     * time, and ignore any records that are before the restriction start 
time. This way the child
+     * partition should be returned within the query.
+     */
+    final Timestamp restrictionStartTimestamp =
+        Timestamp.ofTimeMicroseconds(tracker.currentRestriction().getFrom());
+    final Timestamp previousStartTimestamp =
+        Timestamp.ofTimeMicroseconds(
+            TimestampConverter.timestampToMicros(restrictionStartTimestamp) - 
1);
+    final boolean isFirstRun =
+        restrictionStartTimestamp.compareTo(partition.getStartTimestamp()) == 
0;
+    final Timestamp startTimestamp =
+        isFirstRun ? restrictionStartTimestamp : previousStartTimestamp;
+
+    try (Scope scope =
+        
TRACER.spanBuilder("QueryChangeStreamAction").setRecordEvents(true).startScopedSpan())
 {
+      TRACER
+          .getCurrentSpan()
+          .putAttribute(PARTITION_ID_ATTRIBUTE_LABEL, 
AttributeValue.stringAttributeValue(token));
+
+      // TODO: Potentially we can avoid this fetch, by enriching the runningAt 
timestamp when the
+      // ReadChangeStreamPartitionDoFn#processElement is called
+      final PartitionMetadata updatedPartition =
+          Optional.ofNullable(partitionMetadataDao.getPartition(token))
+              .map(partitionMetadataMapper::from)
+              .orElseThrow(
+                  () ->
+                      new IllegalStateException(
+                          "Partition " + token + " not found in metadata 
table"));
+
+      try (ChangeStreamResultSet resultSet =
+          changeStreamDao.changeStreamQuery(
+              token, startTimestamp, endTimestamp, 
partition.getHeartbeatMillis())) {
+
+        while (resultSet.next()) {
+          // TODO: Check what should we do if there is an error here
+          final List<ChangeStreamRecord> records =
+              changeStreamRecordMapper.toChangeStreamRecords(
+                  updatedPartition, resultSet.getCurrentRowAsStruct(), 
resultSet.getMetadata());
+
+          Optional<ProcessContinuation> maybeContinuation;
+          for (final ChangeStreamRecord record : records) {
+            if 
(record.getRecordTimestamp().compareTo(restrictionStartTimestamp) < 0) {
+              continue;
+            }
+
+            if (record instanceof DataChangeRecord) {
+              maybeContinuation =
+                  dataChangeRecordAction.run(
+                      updatedPartition,
+                      (DataChangeRecord) record,
+                      tracker,
+                      receiver,
+                      watermarkEstimator);
+            } else if (record instanceof HeartbeatRecord) {
+              maybeContinuation =
+                  heartbeatRecordAction.run(
+                      updatedPartition, (HeartbeatRecord) record, tracker, 
watermarkEstimator);
+            } else if (record instanceof ChildPartitionsRecord) {
+              maybeContinuation =
+                  childPartitionsRecordAction.run(
+                      updatedPartition,
+                      (ChildPartitionsRecord) record,
+                      tracker,
+                      watermarkEstimator);
+            } else {
+              LOG.error("[" + token + "] Unknown record type " + 
record.getClass());
+              // FIXME: Check what should we do if the record is unknown
+              throw new IllegalArgumentException("Unknown record type " + 
record.getClass());
+            }
+            if (maybeContinuation.isPresent()) {
+              LOG.debug("[" + token + "] Continuation present, returning " + 
maybeContinuation);
+              bundleFinalizer.afterBundleCommit(
+                  Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
+                  updateWatermarkCallback(token, watermarkEstimator));
+              return maybeContinuation.get();
+            }
+          }
+        }
+
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
+            updateWatermarkCallback(token, watermarkEstimator));
+
+      } catch (SpannerException e) {
+        if (isTimestampOutOfRange(e)) {
+          LOG.debug(
+              "["
+                  + token
+                  + "] query change stream is out of range for "
+                  + startTimestamp
+                  + " to "
+                  + endTimestamp
+                  + ", finishing stream");
+        } else {
+          throw e;
+        }
+      }
+    }
+
+    final long endMicros = TimestampConverter.timestampToMicros(endTimestamp);
+    LOG.debug("[" + token + "] change stream completed successfully");
+    if (tracker.tryClaim(endMicros)) {
+      LOG.debug("[" + token + "] Finishing partition");
+      partitionMetadataDao.updateToFinished(token);
+      LOG.info("[" + token + "] Partition finished");
+    }
+    return ProcessContinuation.stop();
+  }
+
+  private BundleFinalizer.Callback updateWatermarkCallback(
+      String token, WatermarkEstimator<Instant> watermarkEstimator) {
+    return () -> {
+      final Instant watermark = watermarkEstimator.currentWatermark();
+      LOG.debug("[" + token + "] Updating current watermark to " + watermark);
+      try {
+        partitionMetadataDao.updateWatermark(
+            token, 
TimestampConverter.timestampFromMillis(watermark.getMillis()));
+      } catch (SpannerException e) {
+        if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
+          LOG.debug("[" + token + "] Unable to update the current watermark, 
partition NOT FOUND");
+        } else {
+          LOG.error("[" + token + "] Error updating the current watermark: " + 
e.getMessage(), e);
+        }
+      }
+    };
+  }
+
+  private boolean isTimestampOutOfRange(SpannerException e) {
+    return (e.getErrorCode() == ErrorCode.INVALID_ARGUMENT
+            || e.getErrorCode() == ErrorCode.OUT_OF_RANGE)
+        && e.getMessage() != null
+        && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/package-info.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/package-info.java
new file mode 100644
index 0000000..200abac
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Action processors for each of the types of Change Stream records received. 
*/
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
new file mode 100644
index 0000000..1c73ad6
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
+
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import java.io.Serializable;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DataChangeRecordAction;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.HeartbeatRecordAction;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SDF (Splittable DoFn) class which is responsible for performing a change 
stream query for a
+ * given partition. A different action will be taken depending on the type of 
record received from
+ * the query. This component will also reflect the partition state in the 
partition metadata tables.
+ *
+ * <p>The processing of a partition is delegated to the {@link 
QueryChangeStreamAction}.
+ */
+// Allows for transient QueryChangeStreamAction
+@SuppressWarnings("initialization.fields.uninitialized")
+@UnboundedPerElement
+public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, 
DataChangeRecord>
+    implements Serializable {
+
+  private static final long serialVersionUID = -7574596218085711975L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
+  private static final Tracer TRACER = Tracing.getTracer();
+
+  private final DaoFactory daoFactory;
+  private final MapperFactory mapperFactory;
+  private final ActionFactory actionFactory;
+  private final ChangeStreamMetrics metrics;
+
+  private transient QueryChangeStreamAction queryChangeStreamAction;
+
+  /**
+   * This class needs a {@link DaoFactory} to build DAOs to access the 
partition metadata tables and
+   * to perform the change streams query. It uses mappers to transform 
database rows into the {@link
+   * 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord} 
model. It uses the
+   * {@link ActionFactory} to construct the action dispatchers, which will 
perform the change stream
+   * query and process each type of record received. It emits metrics for the 
partition using the
+   * {@link ChangeStreamMetrics}.
+   *
+   * @param daoFactory the {@link DaoFactory} to construct {@link 
PartitionMetadataDao}s and {@link
+   *     ChangeStreamDao}s
+   * @param mapperFactory the {@link MapperFactory} to construct {@link 
ChangeStreamRecordMapper}s
+   * @param actionFactory the {@link ActionFactory} to construct actions
+   * @param metrics the {@link ChangeStreamMetrics} to emit partition related 
metrics
+   */
+  public ReadChangeStreamPartitionDoFn(
+      DaoFactory daoFactory,
+      MapperFactory mapperFactory,
+      ActionFactory actionFactory,
+      ChangeStreamMetrics metrics) {
+    this.daoFactory = daoFactory;
+    this.mapperFactory = mapperFactory;
+    this.actionFactory = actionFactory;
+    this.metrics = metrics;
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant 
currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public ManualWatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return new Manual(watermarkEstimatorState);
+  }
+
+  /**
+   * The restriction for a partition will be defined from the start and end 
timestamp to query the
+   * partition for. These timestamps are converted to microseconds. The {@link 
OffsetRange}
+   * restriction represents a closed-open interval, while the start / end 
timestamps represent a
+   * closed-closed interval, so we add 1 microsecond to the end timestamp to 
convert it to
+   * closed-open.
+   *
+   * <p>In this function we also update the partition state to {@link
+   * PartitionMetadata.State#RUNNING}.
+   *
+   * @param partition the partition to be queried
+   * @return the offset range from the partition start timestamp to the 
partition end timestamp + 1
+   *     microsecond
+   */
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element PartitionMetadata partition) {
+    final String token = partition.getPartitionToken();
+    final com.google.cloud.Timestamp startTimestamp = 
partition.getStartTimestamp();
+    final long startMicros = 
TimestampConverter.timestampToMicros(startTimestamp);
+    // Offset range represents closed-open interval
+    final long endMicros =
+        Optional.ofNullable(partition.getEndTimestamp())
+            .map(TimestampConverter::timestampToMicros)
+            .map(micros -> micros + 1)
+            .orElse(TimestampConverter.MAX_MICROS + 1);
+    final com.google.cloud.Timestamp partitionScheduledAt = 
partition.getScheduledAt();
+    final com.google.cloud.Timestamp partitionRunningAt =
+        daoFactory.getPartitionMetadataDao().updateToRunning(token);
+
+    if (partitionScheduledAt != null && partitionRunningAt != null) {
+      metrics.updatePartitionScheduledToRunning(
+          new Duration(
+              partitionScheduledAt.toSqlTimestamp().getTime(),
+              partitionRunningAt.toSqlTimestamp().getTime()));
+    }
+
+    return new OffsetRange(startMicros, endMicros);
+  }
+
+  @NewTracker
+  public ReadChangeStreamPartitionRangeTracker newTracker(
+      @Element PartitionMetadata partition, @Restriction OffsetRange 
offsetRange) {
+    return new ReadChangeStreamPartitionRangeTracker(partition, offsetRange);
+  }
+
+  /**
+   * Constructs instances for the {@link PartitionMetadataDao}, {@link 
ChangeStreamDao}, {@link
+   * ChangeStreamRecordMapper}, {@link PartitionMetadataMapper}, {@link 
DataChangeRecordAction},
+   * {@link HeartbeatRecordAction}, {@link ChildPartitionsRecordAction} and 
{@link
+   * QueryChangeStreamAction}.
+   */
+  @Setup
+  public void setup() {
+    final PartitionMetadataDao partitionMetadataDao = 
daoFactory.getPartitionMetadataDao();
+    final ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao();
+    final ChangeStreamRecordMapper changeStreamRecordMapper =
+        mapperFactory.changeStreamRecordMapper();
+    final PartitionMetadataMapper partitionMetadataMapper = 
mapperFactory.partitionMetadataMapper();
+    final DataChangeRecordAction dataChangeRecordAction = 
actionFactory.dataChangeRecordAction();
+    final HeartbeatRecordAction heartbeatRecordAction =
+        actionFactory.heartbeatRecordAction(metrics);
+    final ChildPartitionsRecordAction childPartitionsRecordAction =
+        actionFactory.childPartitionsRecordAction(partitionMetadataDao, 
metrics);
+
+    this.queryChangeStreamAction =
+        actionFactory.queryChangeStreamAction(
+            changeStreamDao,
+            partitionMetadataDao,
+            changeStreamRecordMapper,
+            partitionMetadataMapper,
+            dataChangeRecordAction,
+            heartbeatRecordAction,
+            childPartitionsRecordAction);
+  }
+
+  /**
+   * Performs a change stream query for a given partition. A different action 
will be taken
+   * depending on the type of record received from the query. This component 
will also reflect the
+   * partition state in the partition metadata tables.
+   *
+   * <p>The processing of a partition is delegated to the {@link 
QueryChangeStreamAction}.
+   *
+   * @param partition the partition to be queried
+   * @param tracker an instance of {@link 
ReadChangeStreamPartitionRangeTracker}
+   * @param receiver a {@link DataChangeRecord} {@link OutputReceiver}
+   * @param watermarkEstimator a {@link ManualWatermarkEstimator} of {@link 
Instant}
+   * @param bundleFinalizer the bundle finalizer
+   * @return a {@link ProcessContinuation#stop()} if a record timestamp could 
not be claimed or if
+   *     the partition processing has finished
+   */
+  @ProcessElement
+  public ProcessContinuation processElement(
+      @Element PartitionMetadata partition,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      OutputReceiver<DataChangeRecord> receiver,
+      ManualWatermarkEstimator<Instant> watermarkEstimator,
+      BundleFinalizer bundleFinalizer) {
+
+    final String token = partition.getPartitionToken();
+    try (Scope scope =
+        TRACER
+            .spanBuilder("ReadChangeStreamPartitionDoFn.processElement")
+            .setRecordEvents(true)
+            .startScopedSpan()) {
+      TRACER
+          .getCurrentSpan()
+          .putAttribute(PARTITION_ID_ATTRIBUTE_LABEL, 
AttributeValue.stringAttributeValue(token));
+
+      LOG.debug(
+          "[" + token + "] Processing element with restriction " + 
tracker.currentRestriction());
+
+      return queryChangeStreamAction.run(
+          partition, tracker, receiver, watermarkEstimator, bundleFinalizer);
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java
new file mode 100644
index 0000000..e783221
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+
+/**
+ * This restriction tracker is a decorator on top of the {@link 
OffsetRangeTracker}. It modifies the
+ * behaviour of {@link OffsetRangeTracker#tryClaim(Long)} to ignore claims for 
the same long
+ * multiple times. This is because several change stream records might have 
the same timestamp, thus
+ * leading to multiple claims of the same {@link Long}. Other than that, it 
modifies the {@link
+ * OffsetRangeTracker#trySplit(double)} method to always deny splits for the 
{@link
+ * InitialPartition#PARTITION_TOKEN}, since we only need to perform this query 
once.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ReadChangeStreamPartitionRangeTracker extends OffsetRangeTracker {
+
+  private final PartitionMetadata partition;
+
+  /**
+   * Receives the partition that will be queried and be using this tracker, 
alongside the range
+   * itself.
+   *
+   * @param partition the partition that will use the tracker
+   * @param range closed / open range interval representing the start / end 
times for a partition
+   */
+  public ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, 
OffsetRange range) {
+    super(range);
+    this.partition = partition;
+  }
+
+  /**
+   * Attempts to claim the given offset.
+   *
+   * <p>Must be equal or larger than the last successfully claimed offset.
+   *
+   * @return {@code true} if the offset was successfully claimed, {@code 
false} if it is outside the
+   *     current {@link OffsetRange} of this tracker (in that case this 
operation is a no-op).
+   */
+  @Override
+  public boolean tryClaim(Long i) {
+    if (i.equals(lastAttemptedOffset)) {
+      return true;
+    }
+    return super.tryClaim(i);
+  }
+
+  /**
+   * If the partition token is the {@link InitialPartition#PARTITION_TOKEN}, 
it does not allow for
+   * splits (returns null).
+   */
+  @Override
+  public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+    if (InitialPartition.isInitialPartition(partition.getPartitionToken())) {
+      return null;
+    }
+    return super.trySplit(fractionOfRemainder);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/package-info.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/package-info.java
new file mode 100644
index 0000000..7868b29
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Custom restriction tracker related classes. */
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverterTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverterTest.java
new file mode 100644
index 0000000..1cbc75a
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import org.junit.Test;
+
+public class TimestampConverterTest {
+
+  @Test
+  public void testConvertTimestampToMicros() {
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(2_000_360L);
+
+    assertEquals(2_000_360L, TimestampConverter.timestampToMicros(timestamp));
+  }
+
+  @Test
+  public void testConvertTimestampZeroToMicros() {
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(0);
+
+    assertEquals(0L, TimestampConverter.timestampToMicros(timestamp));
+  }
+
+  @Test
+  public void testConvertTimestampMinToMicros() {
+    final Timestamp timestamp = Timestamp.MIN_VALUE;
+
+    assertEquals(-62135596800000000L, 
TimestampConverter.timestampToMicros(timestamp));
+  }
+
+  @Test
+  public void testConvertTimestampMaxToMicros() {
+    final Timestamp timestamp = Timestamp.MAX_VALUE;
+
+    assertEquals(253402300799999999L, 
TimestampConverter.timestampToMicros(timestamp));
+  }
+
+  @Test
+  public void testConvertMillisToTimestamp() {
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(1234_000L);
+
+    assertEquals(timestamp, TimestampConverter.timestampFromMillis(1234L));
+  }
+
+  @Test
+  public void testTruncateNanos() {
+    final Timestamp timestamp = Timestamp.ofTimeSecondsAndNanos(10L, 
123456789);
+    final Timestamp expectedTimestamp = Timestamp.ofTimeSecondsAndNanos(10L, 
123456000);
+
+    assertEquals(expectedTimestamp, 
TimestampConverter.truncateNanos(timestamp));
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
new file mode 100644
index 0000000..429a3db
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao.InTransactionContext;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChildPartitionsRecordActionTest {
+
+  private PartitionMetadataDao dao;
+  private InTransactionContext transaction;
+  private ChangeStreamMetrics metrics;
+  private ChildPartitionsRecordAction action;
+  private RestrictionTracker<OffsetRange, Long> tracker;
+  private ManualWatermarkEstimator<Instant> watermarkEstimator;
+
+  @Before
+  public void setUp() {
+    dao = mock(PartitionMetadataDao.class);
+    transaction = mock(InTransactionContext.class);
+    metrics = mock(ChangeStreamMetrics.class);
+    action = new ChildPartitionsRecordAction(dao, metrics);
+    tracker = mock(RestrictionTracker.class);
+    watermarkEstimator = mock(ManualWatermarkEstimator.class);
+
+    when(dao.runInTransaction(any())).thenAnswer(new 
TestTransactionAnswer(transaction));
+  }
+
+  @Test
+  public void testRestrictionClaimedAndIsSplitCase() {
+    final String partitionToken = "partitionToken";
+    final long heartbeat = 30L;
+    final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+    final PartitionMetadata partition = mock(PartitionMetadata.class);
+    final ChildPartitionsRecord record =
+        new ChildPartitionsRecord(
+            startTimestamp,
+            "recordSequence",
+            Arrays.asList(
+                new ChildPartition("childPartition1", partitionToken),
+                new ChildPartition("childPartition2", partitionToken)),
+            null);
+    when(partition.getEndTimestamp()).thenReturn(endTimestamp);
+    when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+    when(tracker.tryClaim(10L)).thenReturn(true);
+    when(transaction.getPartition("childPartition1")).thenReturn(null);
+    when(transaction.getPartition("childPartition2")).thenReturn(null);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, record, tracker, watermarkEstimator);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(startTimestamp.toSqlTimestamp().getTime()));
+    verify(transaction)
+        .insert(
+            PartitionMetadata.newBuilder()
+                .setPartitionToken("childPartition1")
+                .setParentTokens(Sets.newHashSet(partitionToken))
+                .setStartTimestamp(startTimestamp)
+                .setEndTimestamp(endTimestamp)
+                .setHeartbeatMillis(heartbeat)
+                .setState(CREATED)
+                .setWatermark(startTimestamp)
+                .build());
+    verify(transaction)
+        .insert(
+            PartitionMetadata.newBuilder()
+                .setPartitionToken("childPartition2")
+                .setParentTokens(Sets.newHashSet(partitionToken))
+                .setStartTimestamp(startTimestamp)
+                .setEndTimestamp(endTimestamp)
+                .setHeartbeatMillis(heartbeat)
+                .setState(CREATED)
+                .setWatermark(startTimestamp)
+                .build());
+  }
+
+  @Test
+  public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() {
+    final String partitionToken = "partitionToken";
+    final long heartbeat = 30L;
+    final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+    final PartitionMetadata partition = mock(PartitionMetadata.class);
+    final ChildPartitionsRecord record =
+        new ChildPartitionsRecord(
+            startTimestamp,
+            "recordSequence",
+            Arrays.asList(
+                new ChildPartition("childPartition1", partitionToken),
+                new ChildPartition("childPartition2", partitionToken)),
+            null);
+    when(partition.getEndTimestamp()).thenReturn(endTimestamp);
+    when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+    when(tracker.tryClaim(10L)).thenReturn(true);
+    
when(transaction.getPartition("childPartition1")).thenReturn(mock(Struct.class));
+    
when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class));
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, record, tracker, watermarkEstimator);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(startTimestamp.toSqlTimestamp().getTime()));
+  }
+
+  @Test
+  public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() {
+    final String partitionToken = "partitionToken";
+    final String anotherPartitionToken = "anotherPartitionToken";
+    final String childPartitionToken = "childPartition1";
+    final HashSet<String> parentTokens = Sets.newHashSet(partitionToken, 
anotherPartitionToken);
+    final long heartbeat = 30L;
+    final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+    final PartitionMetadata partition = mock(PartitionMetadata.class);
+    final ChildPartitionsRecord record =
+        new ChildPartitionsRecord(
+            startTimestamp,
+            "recordSequence",
+            Collections.singletonList(new ChildPartition(childPartitionToken, 
parentTokens)),
+            null);
+    when(partition.getEndTimestamp()).thenReturn(endTimestamp);
+    when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+    when(tracker.tryClaim(10L)).thenReturn(true);
+    when(transaction.getPartition(childPartitionToken)).thenReturn(null);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, record, tracker, watermarkEstimator);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(startTimestamp.toSqlTimestamp().getTime()));
+    verify(transaction)
+        .insert(
+            PartitionMetadata.newBuilder()
+                .setPartitionToken(childPartitionToken)
+                .setParentTokens(parentTokens)
+                .setStartTimestamp(startTimestamp)
+                .setEndTimestamp(endTimestamp)
+                .setHeartbeatMillis(heartbeat)
+                .setState(CREATED)
+                .setWatermark(startTimestamp)
+                .build());
+  }
+
+  @Test
+  public void testRestrictionClaimedAndIsMergeCaseAndChildExists() {
+    final String partitionToken = "partitionToken";
+    final String anotherPartitionToken = "anotherPartitionToken";
+    final String childPartitionToken = "childPartition1";
+    final HashSet<String> parentTokens = Sets.newHashSet(partitionToken, 
anotherPartitionToken);
+    final long heartbeat = 30L;
+    final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+    final PartitionMetadata partition = mock(PartitionMetadata.class);
+    final ChildPartitionsRecord record =
+        new ChildPartitionsRecord(
+            startTimestamp,
+            "recordSequence",
+            Collections.singletonList(new ChildPartition(childPartitionToken, 
parentTokens)),
+            null);
+    when(partition.getEndTimestamp()).thenReturn(endTimestamp);
+    when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+    when(tracker.tryClaim(10L)).thenReturn(true);
+    
when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class));
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, record, tracker, watermarkEstimator);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(startTimestamp.toSqlTimestamp().getTime()));
+    verify(transaction, never()).insert(any());
+  }
+
+  @Test
+  public void testRestrictionNotClaimed() {
+    final String partitionToken = "partitionToken";
+    final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+    final PartitionMetadata partition = mock(PartitionMetadata.class);
+    final ChildPartitionsRecord record =
+        new ChildPartitionsRecord(
+            startTimestamp,
+            "recordSequence",
+            Arrays.asList(
+                new ChildPartition("childPartition1", partitionToken),
+                new ChildPartition("childPartition2", partitionToken)),
+            null);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+    when(tracker.tryClaim(10L)).thenReturn(false);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, record, tracker, watermarkEstimator);
+
+    assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
+    verify(watermarkEstimator, never()).setWatermark(any());
+    verify(dao, never()).insert(any());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
new file mode 100644
index 0000000..15789bb
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DataChangeRecordActionTest {
+
+  private DataChangeRecordAction action;
+  private PartitionMetadata partition;
+  private RestrictionTracker<OffsetRange, Long> tracker;
+  private OutputReceiver<DataChangeRecord> outputReceiver;
+  private ManualWatermarkEstimator<Instant> watermarkEstimator;
+
+  @Before
+  public void setUp() {
+    action = new DataChangeRecordAction();
+    partition = mock(PartitionMetadata.class);
+    tracker = mock(RestrictionTracker.class);
+    outputReceiver = mock(OutputReceiver.class);
+    watermarkEstimator = mock(ManualWatermarkEstimator.class);
+  }
+
+  @Test
+  public void testRestrictionClaimed() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final Instant instant = new Instant(timestamp.toSqlTimestamp().getTime());
+    final DataChangeRecord record = mock(DataChangeRecord.class);
+    when(record.getCommitTimestamp()).thenReturn(timestamp);
+    when(tracker.tryClaim(10L)).thenReturn(true);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, record, tracker, outputReceiver, 
watermarkEstimator);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(outputReceiver).outputWithTimestamp(record, instant);
+    verify(watermarkEstimator).setWatermark(instant);
+  }
+
+  @Test
+  public void testRestrictionNotClaimed() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+    final DataChangeRecord record = mock(DataChangeRecord.class);
+    when(record.getCommitTimestamp()).thenReturn(timestamp);
+    when(tracker.tryClaim(10L)).thenReturn(false);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, record, tracker, outputReceiver, 
watermarkEstimator);
+
+    assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
+    verify(outputReceiver, never()).outputWithTimestamp(any(), any());
+    verify(watermarkEstimator, never()).setWatermark(any());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
new file mode 100644
index 0000000..c7ed34f
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HeartbeatRecordActionTest {
+
+  private HeartbeatRecordAction action;
+  private PartitionMetadata partition;
+  private RestrictionTracker<OffsetRange, Long> tracker;
+  private ManualWatermarkEstimator<Instant> watermarkEstimator;
+
+  @Before
+  public void setUp() {
+    final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
+    action = new HeartbeatRecordAction(metrics);
+    partition = mock(PartitionMetadata.class);
+    tracker = mock(RestrictionTracker.class);
+    watermarkEstimator = mock(ManualWatermarkEstimator.class);
+  }
+
+  @Test
+  public void testRestrictionClaimed() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+
+    when(tracker.tryClaim(10L)).thenReturn(true);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, new HeartbeatRecord(timestamp, null), tracker, 
watermarkEstimator);
+
+    assertEquals(Optional.empty(), maybeContinuation);
+    verify(watermarkEstimator).setWatermark(new 
Instant(timestamp.toSqlTimestamp().getTime()));
+  }
+
+  @Test
+  public void testRestrictionNotClaimed() {
+    final String partitionToken = "partitionToken";
+    final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+
+    when(tracker.tryClaim(10L)).thenReturn(false);
+    when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+    final Optional<ProcessContinuation> maybeContinuation =
+        action.run(partition, new HeartbeatRecord(timestamp, null), tracker, 
watermarkEstimator);
+
+    assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
+    verify(watermarkEstimator, never()).setWatermark(any());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
new file mode 100644
index 0000000..41fb1f7
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.Arrays;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueryChangeStreamActionTest {
+  private static final String PARTITION_TOKEN = "partitionToken";
+  private static final Timestamp PARTITION_START_TIMESTAMP = 
Timestamp.ofTimeMicroseconds(10L);
+  private static final Timestamp PARTITION_END_TIMESTAMP = 
Timestamp.ofTimeMicroseconds(30L);
+  private static final long PARTITION_END_MICROS = 30L;
+  private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L;
+  private static final Instant WATERMARK = Instant.now();
+  private static final Timestamp WATERMARK_TIMESTAMP =
+      TimestampConverter.timestampFromMillis(WATERMARK.getMillis());
+
+  private ChangeStreamDao changeStreamDao;
+  private PartitionMetadataDao partitionMetadataDao;
+  private PartitionMetadata partition;
+  private OffsetRange restriction;
+  private RestrictionTracker<OffsetRange, Long> restrictionTracker;
+  private OutputReceiver<DataChangeRecord> outputReceiver;
+  private ChangeStreamRecordMapper changeStreamRecordMapper;
+  private PartitionMetadataMapper partitionMetadataMapper;
+  private ManualWatermarkEstimator<Instant> watermarkEstimator;
+  private BundleFinalizer bundleFinalizer;
+  private DataChangeRecordAction dataChangeRecordAction;
+  private HeartbeatRecordAction heartbeatRecordAction;
+  private ChildPartitionsRecordAction childPartitionsRecordAction;
+  private QueryChangeStreamAction action;
+
+  @Before
+  public void setUp() throws Exception {
+    changeStreamDao = mock(ChangeStreamDao.class);
+    partitionMetadataDao = mock(PartitionMetadataDao.class);
+    changeStreamRecordMapper = mock(ChangeStreamRecordMapper.class);
+    partitionMetadataMapper = mock(PartitionMetadataMapper.class);
+    dataChangeRecordAction = mock(DataChangeRecordAction.class);
+    heartbeatRecordAction = mock(HeartbeatRecordAction.class);
+    childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
+
+    action =
+        new QueryChangeStreamAction(
+            changeStreamDao,
+            partitionMetadataDao,
+            changeStreamRecordMapper,
+            partitionMetadataMapper,
+            dataChangeRecordAction,
+            heartbeatRecordAction,
+            childPartitionsRecordAction);
+    final Struct row = mock(Struct.class);
+    partition =
+        PartitionMetadata.newBuilder()
+            .setPartitionToken(PARTITION_TOKEN)
+            .setParentTokens(Sets.newHashSet("parentToken"))
+            .setStartTimestamp(PARTITION_START_TIMESTAMP)
+            .setEndTimestamp(PARTITION_END_TIMESTAMP)
+            .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
+            .setState(SCHEDULED)
+            .setWatermark(WATERMARK_TIMESTAMP)
+            .setScheduledAt(Timestamp.now())
+            .build();
+    restriction = mock(OffsetRange.class);
+    restrictionTracker = mock(RestrictionTracker.class);
+    outputReceiver = mock(OutputReceiver.class);
+    watermarkEstimator = mock(ManualWatermarkEstimator.class);
+    bundleFinalizer = new BundleFinalizerStub();
+
+    when(restrictionTracker.currentRestriction()).thenReturn(restriction);
+    when(restriction.getFrom()).thenReturn(10L);
+    when(partitionMetadataDao.getPartition(PARTITION_TOKEN)).thenReturn(row);
+    when(partitionMetadataMapper.from(row)).thenReturn(partition);
+  }
+
+  @Test
+  public void testQueryChangeStreamWithDataChangeRecord() {
+    final Struct rowAsStruct = mock(Struct.class);
+    final ChangeStreamResultSetMetadata resultSetMetadata =
+        mock(ChangeStreamResultSetMetadata.class);
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final DataChangeRecord record1 = mock(DataChangeRecord.class);
+    final DataChangeRecord record2 = mock(DataChangeRecord.class);
+    when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+    when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+    when(changeStreamDao.changeStreamQuery(
+            PARTITION_TOKEN,
+            PARTITION_START_TIMESTAMP,
+            PARTITION_END_TIMESTAMP,
+            PARTITION_HEARTBEAT_MILLIS))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+    when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+    when(changeStreamRecordMapper.toChangeStreamRecords(partition, 
rowAsStruct, resultSetMetadata))
+        .thenReturn(Arrays.asList(record1, record2));
+    when(dataChangeRecordAction.run(
+            partition, record1, restrictionTracker, outputReceiver, 
watermarkEstimator))
+        .thenReturn(Optional.empty());
+    when(dataChangeRecordAction.run(
+            partition, record2, restrictionTracker, outputReceiver, 
watermarkEstimator))
+        .thenReturn(Optional.of(ProcessContinuation.stop()));
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    verify(dataChangeRecordAction)
+        .run(partition, record1, restrictionTracker, outputReceiver, 
watermarkEstimator);
+    verify(dataChangeRecordAction)
+        .run(partition, record2, restrictionTracker, outputReceiver, 
watermarkEstimator);
+    verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
+
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any());
+    verify(restrictionTracker, never()).tryClaim(any());
+  }
+
+  @Test
+  public void testQueryChangeStreamWithHeartbeatRecord() {
+    final Struct rowAsStruct = mock(Struct.class);
+    final ChangeStreamResultSetMetadata resultSetMetadata =
+        mock(ChangeStreamResultSetMetadata.class);
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final HeartbeatRecord record1 = mock(HeartbeatRecord.class);
+    final HeartbeatRecord record2 = mock(HeartbeatRecord.class);
+    when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+    when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+    when(changeStreamDao.changeStreamQuery(
+            PARTITION_TOKEN,
+            PARTITION_START_TIMESTAMP,
+            PARTITION_END_TIMESTAMP,
+            PARTITION_HEARTBEAT_MILLIS))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+    when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+    when(changeStreamRecordMapper.toChangeStreamRecords(partition, 
rowAsStruct, resultSetMetadata))
+        .thenReturn(Arrays.asList(record1, record2));
+    when(heartbeatRecordAction.run(partition, record1, restrictionTracker, 
watermarkEstimator))
+        .thenReturn(Optional.empty());
+    when(heartbeatRecordAction.run(partition, record2, restrictionTracker, 
watermarkEstimator))
+        .thenReturn(Optional.of(ProcessContinuation.stop()));
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    verify(heartbeatRecordAction).run(partition, record1, restrictionTracker, 
watermarkEstimator);
+    verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, 
watermarkEstimator);
+    verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any());
+    verify(restrictionTracker, never()).tryClaim(any());
+  }
+
+  @Test
+  public void testQueryChangeStreamWithChildPartitionsRecord() {
+    final Struct rowAsStruct = mock(Struct.class);
+    final ChangeStreamResultSetMetadata resultSetMetadata =
+        mock(ChangeStreamResultSetMetadata.class);
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
+    final ChildPartitionsRecord record2 = mock(ChildPartitionsRecord.class);
+    when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+    when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+    when(changeStreamDao.changeStreamQuery(
+            PARTITION_TOKEN,
+            PARTITION_START_TIMESTAMP,
+            PARTITION_END_TIMESTAMP,
+            PARTITION_HEARTBEAT_MILLIS))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+    when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+    when(changeStreamRecordMapper.toChangeStreamRecords(partition, 
rowAsStruct, resultSetMetadata))
+        .thenReturn(Arrays.asList(record1, record2));
+    when(childPartitionsRecordAction.run(
+            partition, record1, restrictionTracker, watermarkEstimator))
+        .thenReturn(Optional.empty());
+    when(childPartitionsRecordAction.run(
+            partition, record2, restrictionTracker, watermarkEstimator))
+        .thenReturn(Optional.of(ProcessContinuation.stop()));
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    verify(childPartitionsRecordAction)
+        .run(partition, record1, restrictionTracker, watermarkEstimator);
+    verify(childPartitionsRecordAction)
+        .run(partition, record2, restrictionTracker, watermarkEstimator);
+    verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+    verify(restrictionTracker, never()).tryClaim(any());
+  }
+
+  @Test
+  public void testQueryChangeStreamWithRestrictionStartAfterPartitionStart() {
+    final Struct rowAsStruct = mock(Struct.class);
+    final ChangeStreamResultSetMetadata resultSetMetadata =
+        mock(ChangeStreamResultSetMetadata.class);
+    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+    final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
+    final ChildPartitionsRecord record2 = mock(ChildPartitionsRecord.class);
+
+    // One microsecond after partition start timestamp
+    when(restriction.getFrom()).thenReturn(11L);
+    // This record should be ignored because it is before restriction.getFrom
+    
when(record1.getRecordTimestamp()).thenReturn(Timestamp.ofTimeMicroseconds(10L));
+    // This record should be included because it is at the restriction.getFrom
+    
when(record2.getRecordTimestamp()).thenReturn(Timestamp.ofTimeMicroseconds(11L));
+    // We should start the query 1 microsecond before the restriction.getFrom
+    when(changeStreamDao.changeStreamQuery(
+            PARTITION_TOKEN,
+            Timestamp.ofTimeMicroseconds(10L),
+            PARTITION_END_TIMESTAMP,
+            PARTITION_HEARTBEAT_MILLIS))
+        .thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(true);
+    when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+    when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+    when(changeStreamRecordMapper.toChangeStreamRecords(partition, 
rowAsStruct, resultSetMetadata))
+        .thenReturn(Arrays.asList(record1, record2));
+    when(childPartitionsRecordAction.run(
+            partition, record2, restrictionTracker, watermarkEstimator))
+        .thenReturn(Optional.of(ProcessContinuation.stop()));
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    verify(childPartitionsRecordAction)
+        .run(partition, record2, restrictionTracker, watermarkEstimator);
+    verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
+
+    verify(childPartitionsRecordAction, never())
+        .run(partition, record1, restrictionTracker, watermarkEstimator);
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+    verify(restrictionTracker, never()).tryClaim(any());
+  }
+
+  @Test
+  public void testQueryChangeStreamWithStreamFinished() {
+    final ChangeStreamResultSet changeStreamResultSet = 
mock(ChangeStreamResultSet.class);
+    when(changeStreamDao.changeStreamQuery(
+            PARTITION_TOKEN,
+            PARTITION_START_TIMESTAMP,
+            PARTITION_END_TIMESTAMP,
+            PARTITION_HEARTBEAT_MILLIS))
+        .thenReturn(changeStreamResultSet);
+    when(changeStreamResultSet.next()).thenReturn(false);
+    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+    when(restrictionTracker.tryClaim(PARTITION_END_MICROS)).thenReturn(true);
+
+    final ProcessContinuation result =
+        action.run(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
+    verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any());
+  }
+
+  private static class BundleFinalizerStub implements BundleFinalizer {
+    @Override
+    public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
+      try {
+        callback.onBundleSuccess();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
new file mode 100644
index 0000000..f70f119
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DataChangeRecordAction;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.HeartbeatRecordAction;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ReadChangeStreamPartitionDoFnTest {
+
+  private static final String PARTITION_TOKEN = "partitionToken";
+  private static final Timestamp PARTITION_START_TIMESTAMP =
+      Timestamp.ofTimeSecondsAndNanos(10, 20);
+  private static final Timestamp PARTITION_END_TIMESTAMP = 
Timestamp.ofTimeSecondsAndNanos(30, 40);
+  private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L;
+
+  private ReadChangeStreamPartitionDoFn doFn;
+  private PartitionMetadata partition;
+  private OffsetRange restriction;
+  private RestrictionTracker<OffsetRange, Long> restrictionTracker;
+  private OutputReceiver<DataChangeRecord> outputReceiver;
+  private ManualWatermarkEstimator<Instant> watermarkEstimator;
+  private BundleFinalizer bundleFinalizer;
+  private DataChangeRecordAction dataChangeRecordAction;
+  private HeartbeatRecordAction heartbeatRecordAction;
+  private ChildPartitionsRecordAction childPartitionsRecordAction;
+  private QueryChangeStreamAction queryChangeStreamAction;
+
+  @Before
+  public void setUp() {
+    final DaoFactory daoFactory = mock(DaoFactory.class);
+    final MapperFactory mapperFactory = mock(MapperFactory.class);
+    final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
+    final ActionFactory actionFactory = mock(ActionFactory.class);
+    final PartitionMetadataDao partitionMetadataDao = 
mock(PartitionMetadataDao.class);
+    final ChangeStreamDao changeStreamDao = mock(ChangeStreamDao.class);
+    final ChangeStreamRecordMapper changeStreamRecordMapper = 
mock(ChangeStreamRecordMapper.class);
+    final PartitionMetadataMapper partitionMetadataMapper = 
mock(PartitionMetadataMapper.class);
+    dataChangeRecordAction = mock(DataChangeRecordAction.class);
+    heartbeatRecordAction = mock(HeartbeatRecordAction.class);
+    childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
+    queryChangeStreamAction = mock(QueryChangeStreamAction.class);
+
+    doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, 
actionFactory, metrics);
+
+    partition =
+        PartitionMetadata.newBuilder()
+            .setPartitionToken(PARTITION_TOKEN)
+            .setParentTokens(Sets.newHashSet("parentToken"))
+            .setStartTimestamp(PARTITION_START_TIMESTAMP)
+            .setEndTimestamp(PARTITION_END_TIMESTAMP)
+            .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
+            .setState(SCHEDULED)
+            .setWatermark(PARTITION_START_TIMESTAMP)
+            .setScheduledAt(Timestamp.now())
+            .build();
+    restriction = mock(OffsetRange.class);
+    restrictionTracker = mock(RestrictionTracker.class);
+    outputReceiver = mock(OutputReceiver.class);
+    watermarkEstimator = mock(ManualWatermarkEstimator.class);
+    bundleFinalizer = mock(BundleFinalizer.class);
+
+    when(restrictionTracker.currentRestriction()).thenReturn(restriction);
+    
when(daoFactory.getPartitionMetadataDao()).thenReturn(partitionMetadataDao);
+    when(daoFactory.getChangeStreamDao()).thenReturn(changeStreamDao);
+    
when(mapperFactory.changeStreamRecordMapper()).thenReturn(changeStreamRecordMapper);
+    
when(mapperFactory.partitionMetadataMapper()).thenReturn(partitionMetadataMapper);
+
+    
when(actionFactory.dataChangeRecordAction()).thenReturn(dataChangeRecordAction);
+    
when(actionFactory.heartbeatRecordAction(metrics)).thenReturn(heartbeatRecordAction);
+    when(actionFactory.childPartitionsRecordAction(partitionMetadataDao, 
metrics))
+        .thenReturn(childPartitionsRecordAction);
+    when(actionFactory.queryChangeStreamAction(
+            changeStreamDao,
+            partitionMetadataDao,
+            changeStreamRecordMapper,
+            partitionMetadataMapper,
+            dataChangeRecordAction,
+            heartbeatRecordAction,
+            childPartitionsRecordAction))
+        .thenReturn(queryChangeStreamAction);
+
+    doFn.setup();
+  }
+
+  @Test
+  public void testQueryChangeStreamMode() {
+    when(queryChangeStreamAction.run(any(), any(), any(), any(), any()))
+        .thenReturn(ProcessContinuation.stop());
+
+    final ProcessContinuation result =
+        doFn.processElement(
+            partition, restrictionTracker, outputReceiver, watermarkEstimator, 
bundleFinalizer);
+
+    assertEquals(ProcessContinuation.stop(), result);
+    verify(queryChangeStreamAction)
+        .run(partition, restrictionTracker, outputReceiver, 
watermarkEstimator, bundleFinalizer);
+
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
+    verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+    verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any());
+    verify(restrictionTracker, never()).tryClaim(any());
+  }
+
+  // --------------------------
+  // Sad Paths
+
+  // Client library errors:
+  //   1. RESOURCE_EXHAUSTED error on client library
+  //   2. DEADLINE_EXCEEDED error on client library
+  //   3. INTERNAL error on client library
+  //   4. UNAVAILABLE error on client library
+  //   5. UNKNOWN error on client library (transaction outcome unknown)
+  //   6. ABORTED error on client library
+  //   7. UNAUTHORIZED error on client library
+
+  // Metadata table
+  //   - Table is deleted
+  //   - Database is deleted
+  //   - No permissions for the metadata table
+  // --------------------------
+
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java
new file mode 100644
index 0000000..4da1d22
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.junit.Test;
+
+public class ReadChangeStreamPartitionRangeTrackerTest {
+
+  @Test
+  public void testTryClaim() {
+    final PartitionMetadata partition = mock(PartitionMetadata.class);
+    final OffsetRange range = new OffsetRange(100, 200);
+    final ReadChangeStreamPartitionRangeTracker tracker =
+        new ReadChangeStreamPartitionRangeTracker(partition, range);
+    assertEquals(range, tracker.currentRestriction());
+    assertTrue(tracker.tryClaim(100L));
+    assertTrue(tracker.tryClaim(100L));
+    assertTrue(tracker.tryClaim(150L));
+    assertTrue(tracker.tryClaim(199L));
+    assertFalse(tracker.tryClaim(200L));
+  }
+
+  @Test
+  public void testTrySplitReturnsNullForInitialPartition() {
+    final PartitionMetadata partition = mock(PartitionMetadata.class);
+    final OffsetRange range = new OffsetRange(100, 200);
+    final ReadChangeStreamPartitionRangeTracker tracker =
+        new ReadChangeStreamPartitionRangeTracker(partition, range);
+
+    
when(partition.getPartitionToken()).thenReturn(InitialPartition.PARTITION_TOKEN);
+
+    assertNull(tracker.trySplit(0.0D));
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestTransactionAnswer.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestTransactionAnswer.java
new file mode 100644
index 0000000..5285183
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestTransactionAnswer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.util;
+
+import com.google.cloud.Timestamp;
+import java.util.function.Function;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao.InTransactionContext;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao.TransactionResult;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@SuppressWarnings("rawtypes")
+public class TestTransactionAnswer implements Answer<TransactionResult> {
+
+  private final InTransactionContext transaction;
+
+  public TestTransactionAnswer(InTransactionContext transaction) {
+    this.transaction = transaction;
+  }
+
+  @Override
+  public TransactionResult answer(InvocationOnMock invocation) {
+    Function<InTransactionContext, Object> callable = 
invocation.getArgument(0);
+    final Object result = callable.apply(transaction);
+    return new TransactionResult(result, Timestamp.now());
+  }
+}

Reply via email to