This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a2ebc8a95f1 Cleanup for GA launch of ReadChangeStream (#27249)
a2ebc8a95f1 is described below

commit a2ebc8a95f1a86d03278dd161f0d8466e4f01c4c
Author: Jack Dingilian <jackdingil...@google.com>
AuthorDate: Tue Jun 27 16:54:19 2023 -0400

    Cleanup for GA launch of ReadChangeStream (#27249)
    
    This includes a series of small changes to release the connector:
    - Remove option for users to override heartbeat duration
    - Add option to skip creating metadata table and standalone utility to 
create metadata table
    - Add hard timeout to mutateRow requests as workaround for hanging requests
    - Remove excessive RCSP logging for debugging during preview
    - Increase read rows timeout to better accomodate large tables
    - Remove side effects from InitializeDoFn
    - Lower ReadChangeStream deadline to closer align with checkpoint duration
    - Add release note to CHANGES.md
---
 CHANGES.md                                         |   1 +
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       | 133 ++++++++++++++++-----
 .../changestreams/action/ChangeStreamAction.java   |  41 +------
 .../action/ReadChangeStreamPartitionAction.java    |  31 +----
 .../dao/BigtableChangeStreamAccessor.java          |  44 +++++--
 .../changestreams/dao/ChangeStreamDao.java         |  14 +--
 .../gcp/bigtable/changestreams/dao/DaoFactory.java |  15 ++-
 .../changestreams/dao/MetadataTableAdminDao.java   |   5 +
 .../changestreams/dao/MetadataTableDao.java        |  47 +++++++-
 .../changestreams/dofn/InitializeDoFn.java         |  35 +-----
 .../dofn/ReadChangeStreamPartitionDoFn.java        |  11 +-
 .../action/ChangeStreamActionTest.java             |  12 +-
 .../ReadChangeStreamPartitionActionTest.java       |  33 +++--
 .../changestreams/dao/MetadataTableDaoTest.java    |  31 +++++
 .../changestreams/dofn/InitializeDoFnTest.java     |  71 ++---------
 .../dofn/ReadChangeStreamPartitionDoFnTest.java    |   5 +-
 16 files changed, 271 insertions(+), 258 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6c55e276f91..aee9b96d9f4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@
 ## I/Os
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Support for Bigtable Change Streams added in Java 
`BigtableIO.ReadChangeStream` 
([#27183](https://github.com/apache/beam/issues/27183))
 
 ## New Features / Improvements
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 319c3139021..06497458a66 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
+import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
 import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
 import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn;
@@ -310,7 +311,6 @@ public class BigtableIO {
    *
    * <ul>
    *   <li>{@link BigtableIO.ReadChangeStream#withStartTime} which defaults to 
now.
-   *   <li>{@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with 
defaults to 1 seconds.
    *   <li>{@link BigtableIO.ReadChangeStream#withMetadataTableProjectId} 
which defaults to value
    *       from {@link BigtableIO.ReadChangeStream#withProjectId}
    *   <li>{@link BigtableIO.ReadChangeStream#withMetadataTableInstanceId} 
which defaults to value
@@ -1797,8 +1797,6 @@ public class BigtableIO {
     RESUME_OR_NEW,
     // Same as RESUME_OR_NEW except if previous pipeline doesn't exist, don't 
start.
     RESUME_OR_FAIL,
-    // Start a new pipeline. Overriding existing pipeline with the same name.
-    NEW,
     // This skips cleaning up previous pipeline metadata and starts a new 
pipeline. This should
     // only be used to skip cleanup in tests
     @VisibleForTesting
@@ -1827,8 +1825,6 @@ public class BigtableIO {
 
     abstract @Nullable Instant getEndTime();
 
-    abstract @Nullable Duration getHeartbeatDuration();
-
     abstract @Nullable String getChangeStreamName();
 
     abstract @Nullable ExistingPipelineOptions getExistingPipelineOptions();
@@ -1837,6 +1833,8 @@ public class BigtableIO {
 
     abstract @Nullable String getMetadataTableId();
 
+    abstract @Nullable Boolean getCreateOrUpdateMetadataTable();
+
     abstract ReadChangeStream.Builder toBuilder();
 
     /**
@@ -1909,16 +1907,6 @@ public class BigtableIO {
       return toBuilder().setEndTime(endTime).build();
     }
 
-    /**
-     * Returns a new {@link BigtableIO.ReadChangeStream} that will send 
heartbeat messages at
-     * specified interval.
-     *
-     * <p>Does not modify this object.
-     */
-    public ReadChangeStream withHeartbeatDuration(Duration interval) {
-      return toBuilder().setHeartbeatDuration(interval).build();
-    }
-
     /**
      * Returns a new {@link BigtableIO.ReadChangeStream} that uses 
changeStreamName as prefix for
      * the metadata table.
@@ -2000,6 +1988,19 @@ public class BigtableIO {
           .build();
     }
 
+    /**
+     * Returns a new {@link BigtableIO.ReadChangeStream} that, if set to true, 
will create or update
+     * metadata table before launching pipeline. Otherwise, it is expected 
that a metadata table
+     * with correct schema exists.
+     *
+     * <p>Optional: defaults to true
+     *
+     * <p>Does not modify this object.
+     */
+    public ReadChangeStream withCreateOrUpdateMetadataTable(boolean 
shouldCreate) {
+      return toBuilder().setCreateOrUpdateMetadataTable(shouldCreate).build();
+    }
+
     @Override
     public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin 
input) {
       checkArgument(
@@ -2040,10 +2041,6 @@ public class BigtableIO {
       if (startTime == null) {
         startTime = Instant.now();
       }
-      Duration heartbeatDuration = getHeartbeatDuration();
-      if (heartbeatDuration == null) {
-        heartbeatDuration = Duration.standardSeconds(1);
-      }
       String changeStreamName = getChangeStreamName();
       if (changeStreamName == null || changeStreamName.isEmpty()) {
         changeStreamName = UniqueIdGenerator.generateRowKeyPrefix();
@@ -2053,21 +2050,55 @@ public class BigtableIO {
         existingPipelineOptions = ExistingPipelineOptions.FAIL_IF_EXISTS;
       }
 
+      boolean shouldCreateOrUpdateMetadataTable = true;
+      if (getCreateOrUpdateMetadataTable() != null) {
+        shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
+      }
+
       ActionFactory actionFactory = new ActionFactory();
+      ChangeStreamMetrics metrics = new ChangeStreamMetrics();
       DaoFactory daoFactory =
           new DaoFactory(
               bigtableConfig, metadataTableConfig, getTableId(), 
metadataTableId, changeStreamName);
-      ChangeStreamMetrics metrics = new ChangeStreamMetrics();
+
+      try {
+        MetadataTableAdminDao metadataTableAdminDao = 
daoFactory.getMetadataTableAdminDao();
+        checkArgument(metadataTableAdminDao != null);
+        checkArgument(
+            metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(
+                metadataTableConfig.getAppProfileId().get()),
+            "App profile id '"
+                + metadataTableConfig.getAppProfileId().get()
+                + "' provided to access metadata table needs to use 
single-cluster routing policy"
+                + " and allow single-row transactions.");
+
+        // Only try to create or update metadata table if option is set to 
true. Otherwise, just
+        // check if the table exists.
+        if (shouldCreateOrUpdateMetadataTable && 
metadataTableAdminDao.createMetadataTable()) {
+          LOG.info("Created metadata table: " + 
metadataTableAdminDao.getTableId());
+        }
+        checkArgument(
+            metadataTableAdminDao.doesMetadataTableExist(),
+            "Metadata table does not exist: " + 
metadataTableAdminDao.getTableId());
+
+        try (BigtableChangeStreamAccessor bigtableChangeStreamAccessor =
+            BigtableChangeStreamAccessor.getOrCreate(bigtableConfig)) {
+          checkArgument(
+              
bigtableChangeStreamAccessor.getTableAdminClient().exists(getTableId()),
+              "Change Stream table does not exist");
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      } finally {
+        daoFactory.close();
+      }
+
       InitializeDoFn initializeDoFn =
-          new InitializeDoFn(
-              daoFactory,
-              metadataTableConfig.getAppProfileId().get(),
-              startTime,
-              existingPipelineOptions);
+          new InitializeDoFn(daoFactory, startTime, existingPipelineOptions);
       DetectNewPartitionsDoFn detectNewPartitionsDoFn =
           new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory, 
metrics);
       ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
-          new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, 
actionFactory, metrics);
+          new ReadChangeStreamPartitionDoFn(daoFactory, actionFactory, 
metrics);
 
       PCollection<KV<ByteString, ChangeStreamRecord>> readChangeStreamOutput =
           input
@@ -2101,14 +2132,60 @@ public class BigtableIO {
 
       abstract ReadChangeStream.Builder setEndTime(Instant endTime);
 
-      abstract ReadChangeStream.Builder setHeartbeatDuration(Duration 
interval);
-
       abstract ReadChangeStream.Builder setChangeStreamName(String 
changeStreamName);
 
       abstract ReadChangeStream.Builder setExistingPipelineOptions(
           ExistingPipelineOptions existingPipelineOptions);
 
+      abstract ReadChangeStream.Builder setCreateOrUpdateMetadataTable(boolean 
shouldCreate);
+
       abstract ReadChangeStream build();
     }
   }
+
+  /**
+   * Utility method to create or update Read Change Stream metadata table. 
This requires Bigtable
+   * table create permissions. This method is useful if the pipeline isn't 
granted permissions to
+   * create Bigtable tables. Run this method with correct permissions to 
create the metadata table,
+   * which is required to read Bigtable change streams. This method only needs 
to be run once, and
+   * the metadata table can be reused for all pipelines.
+   *
+   * @param projectId project id of the metadata table, usually the same as 
the project of the table
+   *     being streamed
+   * @param instanceId instance id of the metadata table, usually the same as 
the instance of the
+   *     table being streamed
+   * @param tableId name of the metadata table, leave it null or empty to use 
default.
+   * @return true if the table was successfully created. Otherwise, false.
+   */
+  public static boolean createOrUpdateReadChangeStreamMetadataTable(
+      String projectId, String instanceId, @Nullable String tableId) throws 
IOException {
+    BigtableConfig bigtableConfig =
+        BigtableConfig.builder()
+            .setValidate(true)
+            .setProjectId(StaticValueProvider.of(projectId))
+            .setInstanceId(StaticValueProvider.of(instanceId))
+            .setAppProfileId(
+                StaticValueProvider.of(
+                    "default")) // App profile is not used. It's only required 
for data API.
+            .build();
+
+    if (tableId == null || tableId.isEmpty()) {
+      tableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME;
+    }
+
+    DaoFactory daoFactory = new DaoFactory(null, bigtableConfig, null, 
tableId, null);
+
+    try {
+      MetadataTableAdminDao metadataTableAdminDao = 
daoFactory.getMetadataTableAdminDao();
+
+      // Only try to create or update metadata table if option is set to true. 
Otherwise, just
+      // check if the table exists.
+      if (metadataTableAdminDao.createMetadataTable()) {
+        LOG.info("Created metadata table: " + 
metadataTableAdminDao.getTableId());
+      }
+      return metadataTableAdminDao.doesMetadataTableExist();
+    } finally {
+      daoFactory.close();
+    }
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
index 3ab3b1973b9..adb817a4c08 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
@@ -28,7 +28,6 @@ import com.google.cloud.bigtable.data.v2.models.Heartbeat;
 import com.google.cloud.bigtable.data.v2.models.Range;
 import com.google.protobuf.ByteString;
 import java.util.Optional;
-import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
 import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator;
@@ -108,8 +107,7 @@ public class ChangeStreamAction {
       RestrictionTracker<StreamProgress, StreamProgress> tracker,
       DoFn.OutputReceiver<KV<ByteString, ChangeStreamRecord>> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
-      BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> 
throughputEstimator,
-      boolean shouldDebug) {
+      BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> 
throughputEstimator) {
     if (record instanceof Heartbeat) {
       Heartbeat heartbeat = (Heartbeat) record;
       final Instant watermark = 
toJodaTime(heartbeat.getEstimatedLowWatermark());
@@ -129,24 +127,11 @@ public class ChangeStreamAction {
               true);
       watermarkEstimator.setWatermark(watermark);
 
-      if (shouldDebug) {
-        LOG.info(
-            "RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
-            formatByteStringRange(partitionRecord.getPartition()),
-            
formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
-            heartbeat.getChangeStreamContinuationToken().getToken(),
-            heartbeat.getEstimatedLowWatermark());
-      }
       // If the tracker fail to claim the streamProgress, it most likely means 
the runner initiated
       // a checkpoint. See {@link
       // 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
       // for more information regarding runner initiated checkpoints.
       if (!tracker.tryClaim(streamProgress)) {
-        if (shouldDebug) {
-          LOG.info(
-              "RCSP {}: Checkpoint heart beat tracker",
-              formatByteStringRange(partitionRecord.getPartition()));
-        }
         return Optional.of(DoFn.ProcessContinuation.stop());
       }
       metrics.incHeartbeatCount();
@@ -163,30 +148,11 @@ public class ChangeStreamAction {
       CloseStream closeStream = (CloseStream) record;
       StreamProgress streamProgress = new StreamProgress(closeStream);
 
-      if (shouldDebug) {
-        LOG.info(
-            "RCSP {}: CloseStream: {}",
-            formatByteStringRange(partitionRecord.getPartition()),
-            closeStream.getChangeStreamContinuationTokens().stream()
-                .map(
-                    c ->
-                        "{partition: "
-                            + formatByteStringRange(c.getPartition())
-                            + " token: "
-                            + c.getToken()
-                            + "}")
-                .collect(Collectors.joining(", ", "[", "]")));
-      }
       // If the tracker fail to claim the streamProgress, it most likely means 
the runner initiated
       // a checkpoint. See {@link
       // 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
       // for more information regarding runner initiated checkpoints.
       if (!tracker.tryClaim(streamProgress)) {
-        if (shouldDebug) {
-          LOG.info(
-              "RCSP {}: Checkpoint close stream tracker",
-              formatByteStringRange(partitionRecord.getPartition()));
-        }
         return Optional.of(DoFn.ProcessContinuation.stop());
       }
       metrics.incClosestreamCount();
@@ -217,11 +183,6 @@ public class ChangeStreamAction {
       // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more 
information regarding
       // runner initiated checkpoints.
       if (!tracker.tryClaim(streamProgress)) {
-        if (shouldDebug) {
-          LOG.info(
-              "RCSP {}: Checkpoint data change tracker",
-              formatByteStringRange(partitionRecord.getPartition()));
-        }
         return Optional.of(DoFn.ProcessContinuation.stop());
       }
       if (changeStreamMutation.getType() == 
ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
index 87273c9894d..8638339ac5c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
@@ -138,23 +138,8 @@ public class ReadChangeStreamPartitionAction {
       OutputReceiver<KV<ByteString, ChangeStreamRecord>> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator)
       throws IOException {
-    // Watermark being delayed beyond 5 minutes signals a possible problem.
-    boolean shouldDebug =
-        
watermarkEstimator.getState().plus(Duration.standardMinutes(5)).isBeforeNow();
     BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> 
throughputEstimator =
         new BytesThroughputEstimator<>(sizeEstimator, Instant.now());
-
-    if (shouldDebug) {
-      LOG.info(
-          "RCSP {}: Partition: "
-              + partitionRecord
-              + "\n Watermark: "
-              + watermarkEstimator.getState()
-              + "\n RestrictionTracker: "
-              + tracker.currentRestriction(),
-          formatByteStringRange(partitionRecord.getPartition()));
-    }
-
     // Lock the partition
     if (tracker.currentRestriction().isEmpty()) {
       boolean lockedPartition = 
metadataTableDao.lockAndRecordPartition(partitionRecord);
@@ -266,12 +251,10 @@ public class ReadChangeStreamPartitionAction {
             new NewPartition(
                 childPartition, Collections.singletonList(token), 
watermarkEstimator.getState()));
       }
-      if (shouldDebug) {
-        LOG.info(
-            "RCSP {}: Split/Merge into {}",
-            formatByteStringRange(partitionRecord.getPartition()),
-            partitionsToString(childPartitions));
-      }
+      LOG.info(
+          "RCSP {}: Split/Merge into {}",
+          formatByteStringRange(partitionRecord.getPartition()),
+          partitionsToString(childPartitions));
       if (!coverSameKeySpace(tokenPartitions, partitionRecord.getPartition())) 
{
         LOG.warn(
             "RCSP {}: CloseStream has tokens {} that don't cover the entire 
keyspace",
@@ -299,8 +282,7 @@ public class ReadChangeStreamPartitionAction {
               partitionRecord,
               tracker.currentRestriction(),
               partitionRecord.getEndTime(),
-              heartbeatDuration,
-              shouldDebug);
+              heartbeatDuration);
       for (ChangeStreamRecord record : stream) {
         Optional<ProcessContinuation> result =
             changeStreamAction.run(
@@ -309,8 +291,7 @@ public class ReadChangeStreamPartitionAction {
                 tracker,
                 receiver,
                 watermarkEstimator,
-                throughputEstimator,
-                shouldDebug);
+                throughputEstimator);
         // changeStreamAction will usually return Optional.empty() except for 
when a checkpoint
         // (either runner or pipeline initiated) is required.
         if (result.isPresent()) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
index 69c6efba1de..ecf9a703959 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java
@@ -47,7 +47,8 @@ import org.threeten.bp.Duration;
  * backend and the jobs on the same machine shares the same sets of 
connections.
  */
 @Internal
-class BigtableChangeStreamAccessor {
+public class BigtableChangeStreamAccessor implements AutoCloseable {
+  static final Duration MUTATE_ROW_DEADLINE = Duration.ofSeconds(30);
   // Create one bigtable data/admin client per bigtable config 
(project/instance/table/app profile)
   private static final ConcurrentHashMap<BigtableConfig, 
BigtableChangeStreamAccessor>
       bigtableAccessors = new ConcurrentHashMap<>();
@@ -55,14 +56,31 @@ class BigtableChangeStreamAccessor {
   private final BigtableDataClient dataClient;
   private final BigtableTableAdminClient tableAdminClient;
   private final BigtableInstanceAdminClient instanceAdminClient;
+  private final BigtableConfig bigtableConfig;
 
   private BigtableChangeStreamAccessor(
       BigtableDataClient dataClient,
       BigtableTableAdminClient tableAdminClient,
-      BigtableInstanceAdminClient instanceAdminClient) {
+      BigtableInstanceAdminClient instanceAdminClient,
+      BigtableConfig bigtableConfig) {
     this.dataClient = dataClient;
     this.tableAdminClient = tableAdminClient;
     this.instanceAdminClient = instanceAdminClient;
+    this.bigtableConfig = bigtableConfig;
+  }
+
+  @Override
+  public synchronized void close() {
+    if (dataClient != null) {
+      dataClient.close();
+    }
+    if (tableAdminClient != null) {
+      tableAdminClient.close();
+    }
+    if (instanceAdminClient != null) {
+      instanceAdminClient.close();
+    }
+    bigtableAccessors.remove(bigtableConfig);
   }
 
   /**
@@ -146,9 +164,10 @@ class BigtableChangeStreamAccessor {
         .readRowsSettings()
         .setRetrySettings(
             readRowsRetrySettings
-                .setInitialRpcTimeout(Duration.ofSeconds(30))
-                .setTotalTimeout(Duration.ofSeconds(30))
-                .setMaxRpcTimeout(Duration.ofSeconds(30))
+                // metadata table scans can get quite large, so use a higher 
deadline
+                .setInitialRpcTimeout(Duration.ofMinutes(3))
+                .setTotalTimeout(Duration.ofMinutes(3))
+                .setMaxRpcTimeout(Duration.ofMinutes(3))
                 .setMaxAttempts(10)
                 .build());
 
@@ -159,9 +178,9 @@ class BigtableChangeStreamAccessor {
         .mutateRowSettings()
         .setRetrySettings(
             mutateRowRetrySettings
-                .setInitialRpcTimeout(Duration.ofSeconds(30))
-                .setTotalTimeout(Duration.ofSeconds(30))
-                .setMaxRpcTimeout(Duration.ofSeconds(30))
+                .setInitialRpcTimeout(MUTATE_ROW_DEADLINE)
+                .setTotalTimeout(MUTATE_ROW_DEADLINE)
+                .setMaxRpcTimeout(MUTATE_ROW_DEADLINE)
                 .setMaxAttempts(10)
                 .build());
 
@@ -185,9 +204,9 @@ class BigtableChangeStreamAccessor {
         .readChangeStreamSettings()
         .setRetrySettings(
             readChangeStreamRetrySettings
-                .setInitialRpcTimeout(Duration.ofSeconds(60))
-                .setTotalTimeout(Duration.ofSeconds(60))
-                .setMaxRpcTimeout(Duration.ofSeconds(60))
+                .setInitialRpcTimeout(Duration.ofSeconds(15))
+                .setTotalTimeout(Duration.ofSeconds(15))
+                .setMaxRpcTimeout(Duration.ofSeconds(15))
                 .setMaxAttempts(10)
                 .build());
 
@@ -196,7 +215,8 @@ class BigtableChangeStreamAccessor {
         BigtableTableAdminClient.create(tableAdminSettingsBuilder.build());
     BigtableInstanceAdminClient instanceAdminClient =
         
BigtableInstanceAdminClient.create(instanceAdminSettingsBuilder.build());
-    return new BigtableChangeStreamAccessor(dataClient, tableAdminClient, 
instanceAdminClient);
+    return new BigtableChangeStreamAccessor(
+        dataClient, tableAdminClient, instanceAdminClient, bigtableConfig);
   }
 
   public BigtableDataClient getDataClient() {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 1c24e09db7d..5974af22f86 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
 
-import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.formatByteStringRange;
 import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter.toThreetenInstant;
 
 import com.google.api.gax.rpc.ServerStream;
@@ -36,14 +35,10 @@ import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** Data access object to list and read stream partitions of a table. */
 @Internal
 public class ChangeStreamDao {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ChangeStreamDao.class);
-
   private final BigtableDataClient dataClient;
   private final String tableId;
 
@@ -75,8 +70,7 @@ public class ChangeStreamDao {
       PartitionRecord partition,
       StreamProgress streamProgress,
       @Nullable Instant endTime,
-      Duration heartbeatDuration,
-      boolean shouldDebug)
+      Duration heartbeatDuration)
       throws IOException {
     ReadChangeStreamQuery query =
         
ReadChangeStreamQuery.create(tableId).streamPartition(partition.getPartition());
@@ -98,12 +92,6 @@ public class ChangeStreamDao {
       query.endTime(TimestampConverter.toThreetenInstant(endTime));
     }
     
query.heartbeatDuration(org.threeten.bp.Duration.ofMillis(heartbeatDuration.getMillis()));
-    if (shouldDebug) {
-      LOG.info(
-          "RCSP {} ReadChangeStreamRequest: {}",
-          formatByteStringRange(partition.getPartition()),
-          query);
-    }
     return dataClient.readChangeStream(query);
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
index acecc22b891..35ac8ed29c3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig;
 // Allows transient fields to be intialized later
 @SuppressWarnings("initialization.fields.uninitialized")
 @Internal
-public class DaoFactory implements Serializable {
+public class DaoFactory implements Serializable, AutoCloseable {
   private static final long serialVersionUID = 3732208768248394205L;
 
   private transient ChangeStreamDao changeStreamDao;
@@ -58,6 +58,19 @@ public class DaoFactory implements Serializable {
     this.metadataTableId = metadataTableId;
   }
 
+  @Override
+  public void close() {
+    try {
+      if (metadataTableAdminDao != null || metadataTableDao != null) {
+        BigtableChangeStreamAccessor.getOrCreate(metadataTableConfig).close();
+      }
+      if (changeStreamDao != null) {
+        BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).close();
+      }
+    } catch (Exception ignored) {
+    }
+  }
+
   public String getChangeStreamName() {
     return changeStreamName;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
index 5194ca49d62..73bfbe5865b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java
@@ -128,6 +128,11 @@ public class MetadataTableAdminDao {
     return false;
   }
 
+  /** @return true if metadata table exists, otherwise false. */
+  public boolean doesMetadataTableExist() {
+    return tableAdminClient.exists(tableId);
+  }
+
   /**
    * Create the metadata table if it does not exist yet. If the table does 
exist, verify all the
    * column families exists, if not add those column families. This table only 
need to be created
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
index d4938e6bef2..418c29d1de7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java
@@ -30,6 +30,7 @@ import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.Metadata
 import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseWatermarkFromRow;
 import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseWatermarkLastUpdatedFromRow;
 
+import com.google.api.core.ApiFuture;
 import com.google.api.gax.rpc.ServerStream;
 import com.google.cloud.bigtable.data.v2.BigtableDataClient;
 import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
@@ -50,6 +51,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import 
org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationException;
@@ -60,6 +64,7 @@ import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.DetectNewPartitio
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -319,7 +324,7 @@ public class MetadataTableDao {
             .deleteCells(
                 MetadataTableAdminDao.CF_SHOULD_DELETE,
                 ByteStringRange.serializeToByteString(parentPartition));
-    dataClient.mutateRow(rowMutation);
+    mutateRowWithHardTimeout(rowMutation);
   }
 
   /**
@@ -352,7 +357,7 @@ public class MetadataTableDao {
           ByteStringRange.serializeToByteString(token.getPartition()),
           1);
     }
-    dataClient.mutateRow(rowMutation);
+    mutateRowWithHardTimeout(rowMutation);
   }
 
   /**
@@ -511,7 +516,7 @@ public class MetadataTableDao {
           MetadataTableAdminDao.QUALIFIER_DEFAULT,
           currentToken.getToken());
     }
-    dataClient.mutateRow(rowMutation);
+    mutateRowWithHardTimeout(rowMutation);
   }
 
   /**
@@ -705,6 +710,10 @@ public class MetadataTableDao {
 
     boolean lockAcquired = !dataClient.checkAndMutateRow(rowMutation);
     if (lockAcquired) {
+      LOG.info(
+          "RCSP: {} acquired lock for uid: {}",
+          formatByteStringRange(partitionRecord.getPartition()),
+          partitionRecord.getUuid());
       return true;
     } else {
       // If the lock is already held we need to check if it was acquired by a 
duplicate
@@ -724,7 +733,7 @@ public class MetadataTableDao {
                 MetadataTableAdminDao.CF_VERSION,
                 MetadataTableAdminDao.QUALIFIER_DEFAULT,
                 MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION);
-    dataClient.mutateRow(rowMutation);
+    mutateRowWithHardTimeout(rowMutation);
   }
 
   /**
@@ -778,6 +787,34 @@ public class MetadataTableDao {
                 MetadataTableAdminDao.CF_MISSING_PARTITIONS,
                 
ByteString.copyFromUtf8(MetadataTableAdminDao.QUALIFIER_DEFAULT),
                 ByteString.copyFrom(serializedMissingPartition));
-    dataClient.mutateRow(rowMutation);
+    mutateRowWithHardTimeout(rowMutation);
+  }
+
+  /**
+   * This adds a hard timeout of 40 seconds to mutate row futures. These 
requests already have a
+   * 30-second deadline. This is a workaround for an extremely rare issue we 
see with requests not
+   * respecting their deadlines. This can be removed once we've pinpointed the 
cause.
+   *
+   * @param rowMutation Bigtable RowMutation to apply
+   */
+  @VisibleForTesting
+  void mutateRowWithHardTimeout(RowMutation rowMutation) {
+    ApiFuture<Void> mutateRowFuture = dataClient.mutateRowAsync(rowMutation);
+    try {
+      mutateRowFuture.get(
+          BigtableChangeStreamAccessor.MUTATE_ROW_DEADLINE.getSeconds() + 10, 
TimeUnit.SECONDS);
+    } catch (TimeoutException timeoutException) {
+      mutateRowFuture.cancel(true);
+      throw new RuntimeException(
+          "Cancelled mutateRow request after exceeding deadline", 
timeoutException);
+    } catch (ExecutionException executionException) {
+      if (executionException.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) executionException.getCause();
+      }
+      throw new RuntimeException(executionException);
+    } catch (InterruptedException interruptedException) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(interruptedException);
+    }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
index 07826ee1db7..c8ee12772cc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java
@@ -36,20 +36,14 @@ import org.slf4j.LoggerFactory;
 @Internal
 public class InitializeDoFn extends DoFn<byte[], InitialPipelineState> 
implements Serializable {
   private static final long serialVersionUID = 1868189906451252363L;
-
   private static final Logger LOG = 
LoggerFactory.getLogger(InitializeDoFn.class);
   private final DaoFactory daoFactory;
-  private final String metadataTableAppProfileId;
   private Instant startTime;
   private final ExistingPipelineOptions existingPipelineOptions;
 
   public InitializeDoFn(
-      DaoFactory daoFactory,
-      String metadataTableAppProfileId,
-      Instant startTime,
-      ExistingPipelineOptions existingPipelineOptions) {
+      DaoFactory daoFactory, Instant startTime, ExistingPipelineOptions 
existingPipelineOptions) {
     this.daoFactory = daoFactory;
-    this.metadataTableAppProfileId = metadataTableAppProfileId;
     this.startTime = startTime;
     this.existingPipelineOptions = existingPipelineOptions;
   }
@@ -59,35 +53,12 @@ public class InitializeDoFn extends DoFn<byte[], 
InitialPipelineState> implement
     LOG.info(daoFactory.getStreamTableDebugString());
     LOG.info(daoFactory.getMetadataTableDebugString());
     LOG.info("ChangeStreamName: " + daoFactory.getChangeStreamName());
-    if (!daoFactory
-        .getMetadataTableAdminDao()
-        
.isAppProfileSingleClusterAndTransactional(this.metadataTableAppProfileId)) {
-      LOG.error(
-          "App profile id '"
-              + metadataTableAppProfileId
-              + "' provided to access metadata table needs to use 
single-cluster routing policy"
-              + " and allow single-row transactions.");
-      // Terminate this pipeline now.
-      return;
-    }
-    if (daoFactory.getMetadataTableAdminDao().createMetadataTable()) {
-      LOG.info("Created metadata table: " + 
daoFactory.getMetadataTableAdminDao().getTableId());
-    } else {
-      LOG.info(
-          "Reusing existing metadata table: " + 
daoFactory.getMetadataTableAdminDao().getTableId());
-    }
 
     boolean resume = false;
     DetectNewPartitionsState detectNewPartitionsState =
         daoFactory.getMetadataTableDao().readDetectNewPartitionsState();
 
     switch (existingPipelineOptions) {
-      case NEW:
-        // clean up table
-        LOG.info(
-            "Cleaning up an old pipeline with the same change stream name to 
start a new pipeline with the same name.");
-        daoFactory.getMetadataTableAdminDao().cleanUpPrefix();
-        break;
       case RESUME_OR_NEW:
         // perform resumption.
         if (detectNewPartitionsState != null) {
@@ -98,7 +69,6 @@ public class InitializeDoFn extends DoFn<byte[], 
InitialPipelineState> implement
           LOG.info(
               "Attempted to resume, but previous watermark does not exist, 
starting at {}",
               startTime);
-          daoFactory.getMetadataTableAdminDao().cleanUpPrefix();
         }
         break;
       case RESUME_OR_FAIL:
@@ -118,9 +88,6 @@ public class InitializeDoFn extends DoFn<byte[], 
InitialPipelineState> implement
               "A previous pipeline exists with the same change stream name and 
existingPipelineOption is set to FAIL_IF_EXISTS.");
           return;
         }
-        // We still want to clean up any existing prefixes in case there are 
lingering metadata that
-        // would interfere with the new run.
-        daoFactory.getMetadataTableAdminDao().cleanUpPrefix();
         break;
       case SKIP_CLEANUP:
         if (detectNewPartitionsState != null) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index 91210caa597..e4c911f926a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -58,9 +58,8 @@ public class ReadChangeStreamPartitionDoFn
   private static final long serialVersionUID = 4418739381635104479L;
   private static final BigDecimal MAX_DOUBLE = 
BigDecimal.valueOf(Double.MAX_VALUE);
   private static final Logger LOG = 
LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
-  public static final int THROUGHPUT_ESTIMATION_WINDOW_SECONDS = 10;
+  private static final Duration HEARTBEAT_DURATION = 
Duration.standardSeconds(1);
 
-  private final Duration heartbeatDuration;
   private final DaoFactory daoFactory;
   private final ChangeStreamMetrics metrics;
   private final ActionFactory actionFactory;
@@ -68,11 +67,7 @@ public class ReadChangeStreamPartitionDoFn
   private ReadChangeStreamPartitionAction readChangeStreamPartitionAction;
 
   public ReadChangeStreamPartitionDoFn(
-      Duration heartbeatDuration,
-      DaoFactory daoFactory,
-      ActionFactory actionFactory,
-      ChangeStreamMetrics metrics) {
-    this.heartbeatDuration = heartbeatDuration;
+      DaoFactory daoFactory, ActionFactory actionFactory, ChangeStreamMetrics 
metrics) {
     this.daoFactory = daoFactory;
     this.metrics = metrics;
     this.actionFactory = actionFactory;
@@ -171,7 +166,7 @@ public class ReadChangeStreamPartitionDoFn
             changeStreamDao,
             metrics,
             changeStreamAction,
-            heartbeatDuration,
+            HEARTBEAT_DURATION,
             sizeEstimator);
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
index 2e340c2b30b..b2c9d6ac131 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
@@ -103,8 +103,7 @@ public class ChangeStreamActionTest {
             tracker,
             receiver,
             watermarkEstimator,
-            throughputEstimator,
-            false);
+            throughputEstimator);
 
     assertFalse(result.isPresent());
     verify(metrics).incHeartbeatCount();
@@ -152,8 +151,7 @@ public class ChangeStreamActionTest {
             tracker,
             receiver,
             watermarkEstimator,
-            throughputEstimator,
-            false);
+            throughputEstimator);
 
     assertTrue(result.isPresent());
     assertEquals(DoFn.ProcessContinuation.resume(), result.get());
@@ -187,8 +185,7 @@ public class ChangeStreamActionTest {
             tracker,
             receiver,
             watermarkEstimator,
-            throughputEstimator,
-            false);
+            throughputEstimator);
 
     assertFalse(result.isPresent());
     verify(metrics).incChangeStreamMutationUserCounter();
@@ -243,8 +240,7 @@ public class ChangeStreamActionTest {
             tracker,
             receiver,
             watermarkEstimator,
-            throughputEstimator,
-            false);
+            throughputEstimator);
 
     assertFalse(result.isPresent());
     verify(metrics).incChangeStreamMutationGcCounter();
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java
index 305be156885..43419b7147f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java
@@ -20,7 +20,6 @@ package 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamContinuationTokenHelper.getTokenWithCorrectPartition;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -128,10 +127,10 @@ public class ReadChangeStreamPartitionActionTest {
     Heartbeat mockHeartBeat = Mockito.mock(Heartbeat.class);
     when(responseIterator.next()).thenReturn(mockHeartBeat);
     when(responseIterator.hasNext()).thenReturn(true);
-    when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), 
anyBoolean()))
+    when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any()))
         .thenReturn(responses);
 
-    when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), 
anyBoolean()))
+    when(changeStreamAction.run(any(), any(), any(), any(), any(), any()))
         .thenReturn(Optional.of(DoFn.ProcessContinuation.stop()));
 
     final DoFn.ProcessContinuation result =
@@ -140,7 +139,7 @@ public class ReadChangeStreamPartitionActionTest {
     assertEquals(DoFn.ProcessContinuation.stop(), result);
     // Verify that on successful lock, we don't tryClaim on the tracker.
     verify(tracker, never()).tryClaim(any());
-    verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), 
anyBoolean());
+    verify(changeStreamAction).run(any(), any(), any(), any(), any(), any());
   }
 
   @Test
@@ -157,7 +156,7 @@ public class ReadChangeStreamPartitionActionTest {
     StreamProgress streamProgress = new StreamProgress();
     streamProgress.setFailToLock(true);
     verify(tracker).tryClaim(streamProgress);
-    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any(), anyBoolean());
+    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any());
   }
 
   @Test
@@ -174,10 +173,10 @@ public class ReadChangeStreamPartitionActionTest {
     Heartbeat mockHeartBeat = Mockito.mock(Heartbeat.class);
     when(responseIterator.next()).thenReturn(mockHeartBeat);
     when(responseIterator.hasNext()).thenReturn(true);
-    when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), 
anyBoolean()))
+    when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any()))
         .thenReturn(responses);
 
-    when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), 
anyBoolean()))
+    when(changeStreamAction.run(any(), any(), any(), any(), any(), any()))
         .thenReturn(Optional.of(DoFn.ProcessContinuation.stop()));
 
     final DoFn.ProcessContinuation result =
@@ -186,7 +185,7 @@ public class ReadChangeStreamPartitionActionTest {
     assertEquals(DoFn.ProcessContinuation.stop(), result);
     // Verify that on successful lock, we don't tryClaim on the tracker.
     verify(tracker, never()).tryClaim(any());
-    verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), 
anyBoolean());
+    verify(changeStreamAction).run(any(), any(), any(), any(), any(), any());
   }
 
   @Test
@@ -205,7 +204,7 @@ public class ReadChangeStreamPartitionActionTest {
     StreamProgress streamProgress = new StreamProgress();
     streamProgress.setFailToLock(true);
     verify(tracker).tryClaim(streamProgress);
-    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any(), anyBoolean());
+    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any());
   }
 
   @Test
@@ -221,16 +220,16 @@ public class ReadChangeStreamPartitionActionTest {
     Heartbeat mockHeartBeat = Mockito.mock(Heartbeat.class);
     when(responseIterator.next()).thenReturn(mockHeartBeat);
     when(responseIterator.hasNext()).thenReturn(true);
-    when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), 
anyBoolean()))
+    when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any()))
         .thenReturn(responses);
 
-    when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), 
anyBoolean()))
+    when(changeStreamAction.run(any(), any(), any(), any(), any(), any()))
         .thenReturn(Optional.of(DoFn.ProcessContinuation.stop()));
 
     final DoFn.ProcessContinuation result =
         action.run(partitionRecord, tracker, receiver, watermarkEstimator);
     assertEquals(DoFn.ProcessContinuation.stop(), result);
-    verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), 
anyBoolean());
+    verify(changeStreamAction).run(any(), any(), any(), any(), any(), any());
   }
 
   @Test
@@ -247,7 +246,7 @@ public class ReadChangeStreamPartitionActionTest {
         action.run(partitionRecord, tracker, receiver, watermarkEstimator);
     assertEquals(DoFn.ProcessContinuation.stop(), result);
     // Should terminate before reaching processing stream partition responses.
-    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any(), anyBoolean());
+    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any());
     // Should not try claim any restriction when processing CloseStream
     verify(tracker, (never())).tryClaim(any());
     // Should decrement the metric on termination.
@@ -273,7 +272,7 @@ public class ReadChangeStreamPartitionActionTest {
         action.run(partitionRecord, tracker, receiver, watermarkEstimator);
     assertEquals(DoFn.ProcessContinuation.stop(), result);
     // Should terminate before reaching processing stream partition responses.
-    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any(), anyBoolean());
+    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any());
     // Should not try claim any restriction when processing CloseStream
     verify(tracker, (never())).tryClaim(any());
     // Should decrement the metric on termination.
@@ -308,7 +307,7 @@ public class ReadChangeStreamPartitionActionTest {
         action.run(partitionRecord, tracker, receiver, watermarkEstimator);
     assertEquals(DoFn.ProcessContinuation.stop(), result);
     // Should terminate before reaching processing stream partition responses.
-    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any(), anyBoolean());
+    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any());
     // Should not try claim any restriction when processing CloseStream
     verify(tracker, (never())).tryClaim(any());
     // Should decrement the metric on termination.
@@ -360,7 +359,7 @@ public class ReadChangeStreamPartitionActionTest {
         action.run(partitionRecord, tracker, receiver, watermarkEstimator);
     assertEquals(DoFn.ProcessContinuation.stop(), result);
     // Should terminate before reaching processing stream partition responses.
-    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any(), anyBoolean());
+    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any());
     // Should not try claim any restriction when processing CloseStream
     verify(tracker, (never())).tryClaim(any());
     // Should decrement the metric on termination.
@@ -406,7 +405,7 @@ public class ReadChangeStreamPartitionActionTest {
         action.run(partitionRecord, tracker, receiver, watermarkEstimator);
     assertEquals(DoFn.ProcessContinuation.stop(), result);
     // Should terminate before reaching processing stream partition responses.
-    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any(), anyBoolean());
+    verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), 
any());
     // Should not try claim any restriction when processing CloseStream
     verify(tracker, (never())).tryClaim(any());
     // Should decrement the metric on termination.
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java
index c82e3eac299..867117b4d39 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java
@@ -23,10 +23,14 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.api.core.ApiFuture;
 import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
 import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
 import com.google.cloud.bigtable.data.v2.BigtableDataClient;
@@ -45,6 +49,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
 import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
@@ -743,4 +750,28 @@ public class MetadataTableDaoTest {
             .get(0)
             .getValue());
   }
+
+  @Test
+  public void mutateRowWithHardTimeoutErrorHandling()
+      throws ExecutionException, InterruptedException, TimeoutException {
+    BigtableDataClient mockClient = Mockito.mock(BigtableDataClient.class);
+    MetadataTableDao daoWithMock =
+        new MetadataTableDao(mockClient, "test-table", 
ByteString.copyFromUtf8("test"));
+    ApiFuture<Void> mockFuture = mock(ApiFuture.class);
+    when(mockClient.mutateRowAsync(any())).thenReturn(mockFuture);
+
+    when(mockFuture.get(40, TimeUnit.SECONDS))
+        .thenThrow(TimeoutException.class)
+        .thenThrow(InterruptedException.class)
+        .thenThrow(ExecutionException.class);
+    assertThrows(
+        RuntimeException.class,
+        () -> daoWithMock.mutateRowWithHardTimeout(RowMutation.create("test", 
"test").deleteRow()));
+    assertThrows(
+        RuntimeException.class,
+        () -> daoWithMock.mutateRowWithHardTimeout(RowMutation.create("test", 
"test").deleteRow()));
+    assertThrows(
+        RuntimeException.class,
+        () -> daoWithMock.mutateRowWithHardTimeout(RowMutation.create("test", 
"test").deleteRow()));
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
index a9fa54d4c49..f0337504282 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFnTest.java
@@ -20,7 +20,6 @@ package 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -88,8 +87,6 @@ public class InitializeDoFnTest {
     metadataTableAdminDao =
         spy(new MetadataTableAdminDao(adminClient, null, changeStreamName, 
tableId));
     metadataTableAdminDao.createMetadataTable();
-    
doReturn(true).when(metadataTableAdminDao).isAppProfileSingleClusterAndTransactional(any());
-    
when(daoFactory.getMetadataTableAdminDao()).thenReturn(metadataTableAdminDao);
     metadataTableDao =
         new MetadataTableDao(
             dataClient, tableId, 
metadataTableAdminDao.getChangeStreamNamePrefix());
@@ -102,10 +99,7 @@ public class InitializeDoFnTest {
     Instant startTime = Instant.now();
     InitializeDoFn initializeDoFn =
         new InitializeDoFn(
-            daoFactory,
-            "app-profile",
-            startTime,
-            BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
+            daoFactory, startTime, 
BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
     initializeDoFn.processElement(outputReceiver);
     verify(outputReceiver, times(1)).output(new 
InitialPipelineState(startTime, false));
   }
@@ -116,10 +110,7 @@ public class InitializeDoFnTest {
     Instant startTime = Instant.now();
     InitializeDoFn initializeDoFn =
         new InitializeDoFn(
-            daoFactory,
-            "app-profile",
-            startTime,
-            BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
+            daoFactory, startTime, 
BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
     initializeDoFn.processElement(outputReceiver);
     verify(outputReceiver, never()).output(any());
   }
@@ -139,51 +130,7 @@ public class InitializeDoFnTest {
     Instant startTime = Instant.now();
     InitializeDoFn initializeDoFn =
         new InitializeDoFn(
-            daoFactory,
-            "app-profile",
-            startTime,
-            BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
-    initializeDoFn.processElement(outputReceiver);
-    verify(outputReceiver, times(1)).output(new 
InitialPipelineState(startTime, false));
-    assertNull(dataClient.readRow(tableId, 
metadataTableAdminDao.getChangeStreamNamePrefix()));
-  }
-
-  @Test
-  public void testInitializeNewWithoutDNP() throws IOException {
-    // DNP row doesn't exist, so we output with provided start time and we 
clean up any existing
-    // rows under the prefix.
-    dataClient.mutateRow(
-        RowMutation.create(
-                tableId,
-                metadataTableAdminDao
-                    .getChangeStreamNamePrefix()
-                    .concat(ByteString.copyFromUtf8("existing_row")))
-            .setCell(
-                MetadataTableAdminDao.CF_WATERMARK, 
MetadataTableAdminDao.QUALIFIER_DEFAULT, 123));
-    Instant startTime = Instant.now();
-    InitializeDoFn initializeDoFn =
-        new InitializeDoFn(
-            daoFactory, "app-profile", startTime, 
BigtableIO.ExistingPipelineOptions.NEW);
-    initializeDoFn.processElement(outputReceiver);
-    verify(outputReceiver, times(1)).output(new 
InitialPipelineState(startTime, false));
-    assertNull(dataClient.readRow(tableId, 
metadataTableAdminDao.getChangeStreamNamePrefix()));
-  }
-
-  @Test
-  public void testInitializeNewWithDNP() throws IOException {
-    metadataTableDao.updateDetectNewPartitionWatermark(Instant.now());
-    dataClient.mutateRow(
-        RowMutation.create(
-                tableId,
-                metadataTableAdminDao
-                    .getChangeStreamNamePrefix()
-                    .concat(ByteString.copyFromUtf8("existing_row")))
-            .setCell(
-                MetadataTableAdminDao.CF_WATERMARK, 
MetadataTableAdminDao.QUALIFIER_DEFAULT, 123));
-    Instant startTime = Instant.now();
-    InitializeDoFn initializeDoFn =
-        new InitializeDoFn(
-            daoFactory, "app-profile", startTime, 
BigtableIO.ExistingPipelineOptions.NEW);
+            daoFactory, startTime, 
BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
     initializeDoFn.processElement(outputReceiver);
     verify(outputReceiver, times(1)).output(new 
InitialPipelineState(startTime, false));
     assertNull(dataClient.readRow(tableId, 
metadataTableAdminDao.getChangeStreamNamePrefix()));
@@ -201,8 +148,7 @@ public class InitializeDoFnTest {
                 MetadataTableAdminDao.CF_WATERMARK, 
MetadataTableAdminDao.QUALIFIER_DEFAULT, 123));
     Instant startTime = Instant.now();
     InitializeDoFn initializeDoFn =
-        new InitializeDoFn(
-            daoFactory, "app-profile", startTime, 
BigtableIO.ExistingPipelineOptions.RESUME_OR_NEW);
+        new InitializeDoFn(daoFactory, startTime, 
BigtableIO.ExistingPipelineOptions.RESUME_OR_NEW);
     initializeDoFn.processElement(outputReceiver);
     // We want to resume but there's no DNP row, so we resume from the 
startTime provided.
     verify(outputReceiver, times(1)).output(new 
InitialPipelineState(startTime, false));
@@ -222,8 +168,7 @@ public class InitializeDoFnTest {
                 MetadataTableAdminDao.CF_WATERMARK, 
MetadataTableAdminDao.QUALIFIER_DEFAULT, 123));
     Instant startTime = Instant.now();
     InitializeDoFn initializeDoFn =
-        new InitializeDoFn(
-            daoFactory, "app-profile", startTime, 
BigtableIO.ExistingPipelineOptions.RESUME_OR_NEW);
+        new InitializeDoFn(daoFactory, startTime, 
BigtableIO.ExistingPipelineOptions.RESUME_OR_NEW);
     initializeDoFn.processElement(outputReceiver);
     verify(outputReceiver, times(1)).output(new 
InitialPipelineState(resumeTime, true));
     assertNull(dataClient.readRow(tableId, 
metadataTableAdminDao.getChangeStreamNamePrefix()));
@@ -241,8 +186,7 @@ public class InitializeDoFnTest {
                 MetadataTableAdminDao.CF_WATERMARK, 
MetadataTableAdminDao.QUALIFIER_DEFAULT, 123));
     Instant startTime = Instant.now();
     InitializeDoFn initializeDoFn =
-        new InitializeDoFn(
-            daoFactory, "app-profile", startTime, 
ExistingPipelineOptions.SKIP_CLEANUP);
+        new InitializeDoFn(daoFactory, startTime, 
ExistingPipelineOptions.SKIP_CLEANUP);
     initializeDoFn.processElement(outputReceiver);
     // Skip cleanup will always resume from startTime
     verify(outputReceiver, times(1)).output(new 
InitialPipelineState(startTime, false));
@@ -264,8 +208,7 @@ public class InitializeDoFnTest {
                 MetadataTableAdminDao.CF_WATERMARK, 
MetadataTableAdminDao.QUALIFIER_DEFAULT, 123));
     Instant startTime = Instant.now();
     InitializeDoFn initializeDoFn =
-        new InitializeDoFn(
-            daoFactory, "app-profile", startTime, 
ExistingPipelineOptions.SKIP_CLEANUP);
+        new InitializeDoFn(daoFactory, startTime, 
ExistingPipelineOptions.SKIP_CLEANUP);
     initializeDoFn.processElement(outputReceiver);
     // We don't want the pipeline to resume to avoid duplicates
     verify(outputReceiver, never()).output(any());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index cb82fc6d19c..b89b2bf15aa 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -20,7 +20,6 @@ package 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
 import static 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter.toThreetenInstant;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -94,7 +93,7 @@ public class ReadChangeStreamPartitionDoFnTest {
             sizeEstimator))
         .thenReturn(readChangeStreamPartitionAction);
 
-    doFn = new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, 
actionFactory, metrics);
+    doFn = new ReadChangeStreamPartitionDoFn(daoFactory, actionFactory, 
metrics);
     doFn.setSizeEstimator(sizeEstimator);
   }
 
@@ -136,7 +135,7 @@ public class ReadChangeStreamPartitionDoFnTest {
     when(mockResponses.hasNext()).thenReturn(true, true, true);
     when(mockResponses.next()).thenReturn(mockMutation, mockMutation, 
mockMutation);
     when(mockStream.iterator()).thenReturn(mockResponses);
-    when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), 
anyBoolean()))
+    when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any()))
         .thenReturn(mockStream);
 
     when(watermarkEstimator.getState()).thenReturn(tenSecondsAgo);

Reply via email to