This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac57b8709d3 Optimize change stream connector with more efficient 
batching and blind writes, and add transaction/query tags (#25718)
ac57b8709d3 is described below

commit ac57b8709d3f2db597a5d9e6339545b0f2036a3a
Author: ChangyuLi28 <35211213+changyul...@users.noreply.github.com>
AuthorDate: Fri Mar 17 15:13:27 2023 -0700

    Optimize change stream connector with more efficient batching and blind 
writes, and add transaction/query tags (#25718)
    
    * Optimize change stream connector with more efficient batching and blind 
writes, and add transaction/query tags
    
    * Optimize change stream connector with more efficient batching and blind 
writes, and add transaction/query tags
    
    * Apply commit deadline only for metadata table writes
    
    ---------
    
    Co-authored-by: Changyu Li <changy...@google.com>
---
 .../MetadataSpannerConfigFactory.java              |  2 +-
 .../action/ChildPartitionsRecordAction.java        |  3 +-
 .../action/DetectNewPartitionsAction.java          | 19 ++++--
 .../changestreams/dao/PartitionMetadataDao.java    | 68 +++++++++++++++-------
 .../action/ChildPartitionsRecordActionTest.java    |  4 +-
 .../dao/PartitionMetadataDaoTest.java              | 29 +++------
 6 files changed, 75 insertions(+), 50 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
index 83965b1bfaa..56c67f3194f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
@@ -84,7 +84,7 @@ public class MetadataSpannerConfigFactory {
 
     ValueProvider<Duration> commitDeadline = primaryConfig.getCommitDeadline();
     if (commitDeadline != null) {
-      config = 
config.withCommitDeadline(StaticValueProvider.of(commitDeadline.get()));
+      config = 
config.withCommitDeadline(StaticValueProvider.of(Duration.standardSeconds(60)));
     }
 
     ValueProvider<Duration> maxCumulativeBackoff = 
primaryConfig.getMaxCumulativeBackoff();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
index 70286b41778..7fb69d0e7ab 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
@@ -155,7 +155,8 @@ public class ChildPartitionsRecordAction {
                   } else {
                     return false;
                   }
-                })
+                },
+                "InsertChildPartition")
             .getResult();
     if (insertedRow && isSplit) {
       metrics.incPartitionRecordSplitCount();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
index 934210250f5..73967d2a2a7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
@@ -146,15 +146,22 @@ public class DetectNewPartitionsAction {
       OutputReceiver<PartitionMetadata> receiver,
       Timestamp minWatermark,
       TreeMap<Timestamp, List<PartitionMetadata>> batches) {
+    List<PartitionMetadata> batchPartitionsDifferentCreatedAt = new 
ArrayList<>();
+    int numTimestampsHandledSofar = 0;
     for (Map.Entry<Timestamp, List<PartitionMetadata>> batch : 
batches.entrySet()) {
+      numTimestampsHandledSofar++;
       final Timestamp batchCreatedAt = batch.getKey();
-      final List<PartitionMetadata> batchPartitions = batch.getValue();
-
-      final Timestamp scheduledAt = updateBatchToScheduled(batchPartitions);
-      if (!tracker.tryClaim(batchCreatedAt)) {
-        return ProcessContinuation.stop();
+      final List<PartitionMetadata> batchPartitionsSameCreatedAt = 
batch.getValue();
+      batchPartitionsDifferentCreatedAt.addAll(batchPartitionsSameCreatedAt);
+      if (batchPartitionsDifferentCreatedAt.size() >= 200
+          || numTimestampsHandledSofar == batches.size()) {
+        final Timestamp scheduledAt = 
updateBatchToScheduled(batchPartitionsDifferentCreatedAt);
+        if (!tracker.tryClaim(batchCreatedAt)) {
+          return ProcessContinuation.stop();
+        }
+        outputBatch(receiver, minWatermark, batchPartitionsDifferentCreatedAt, 
scheduledAt);
+        batchPartitionsDifferentCreatedAt = new ArrayList<>();
       }
-      outputBatch(receiver, minWatermark, batchPartitions, scheduledAt);
     }
 
     return ProcessContinuation.resume().withResumeDelay(resumeDuration);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index 6dc0e7a580d..bde04bb1dec 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -33,6 +33,7 @@ import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.DatabaseClient;
 import com.google.cloud.spanner.Dialect;
 import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Options;
 import com.google.cloud.spanner.ResultSet;
 import com.google.cloud.spanner.Statement;
 import com.google.cloud.spanner.Struct;
@@ -89,7 +90,8 @@ public class PartitionMetadataDao {
     try (ResultSet queryResultSet =
         databaseClient
             .singleUseReadOnlyTransaction()
-            .executeQuery(Statement.of(checkTableExistsStmt))) {
+            .executeQuery(
+                Statement.of(checkTableExistsStmt), 
Options.tag("query=checkTableExists"))) {
       return queryResultSet.next();
     }
   }
@@ -126,7 +128,8 @@ public class PartitionMetadataDao {
               .to(partitionToken)
               .build();
     }
-    try (ResultSet resultSet = 
databaseClient.singleUse().executeQuery(statement)) {
+    try (ResultSet resultSet =
+        databaseClient.singleUse().executeQuery(statement, 
Options.tag("query=getPartition"))) {
       if (resultSet.next()) {
         return resultSet.getCurrentRowAsStruct();
       }
@@ -175,7 +178,10 @@ public class PartitionMetadataDao {
               .to(State.FINISHED.name())
               .build();
     }
-    try (ResultSet resultSet = 
databaseClient.singleUse().executeQuery(statement)) {
+    try (ResultSet resultSet =
+        databaseClient
+            .singleUse()
+            .executeQuery(statement, 
Options.tag("query=getUnfinishedMinWatermark"))) {
       if (resultSet.next()) {
         return resultSet.getTimestamp(COLUMN_WATERMARK);
       }
@@ -226,7 +232,9 @@ public class PartitionMetadataDao {
               .to(timestamp)
               .build();
     }
-    return databaseClient.singleUse().executeQuery(statement);
+    return databaseClient
+        .singleUse()
+        .executeQuery(statement, 
Options.tag("query=getAllPartitionsCreatedAfter"));
   }
 
   /**
@@ -259,7 +267,10 @@ public class PartitionMetadataDao {
               .build();
     }
 
-    try (ResultSet resultSet = 
databaseClient.singleUse().executeQuery(statement)) {
+    try (ResultSet resultSet =
+        databaseClient
+            .singleUse()
+            .executeQuery(statement, 
Options.tag("query=countPartitionsCreatedAfter"))) {
       if (resultSet.next()) {
         return resultSet.getLong("count");
       } else {
@@ -280,7 +291,7 @@ public class PartitionMetadataDao {
    */
   public Timestamp insert(PartitionMetadata row) {
     final TransactionResult<Void> transactionResult =
-        runInTransaction(transaction -> transaction.insert(row));
+        runInTransaction(transaction -> transaction.insert(row), 
"InsertsPartitionMetadata");
     return transactionResult.getCommitTimestamp();
   }
 
@@ -292,7 +303,8 @@ public class PartitionMetadataDao {
    */
   public Timestamp updateToScheduled(List<String> partitionTokens) {
     final TransactionResult<Void> transactionResult =
-        runInTransaction(transaction -> 
transaction.updateToScheduled(partitionTokens));
+        runInTransaction(
+            transaction -> transaction.updateToScheduled(partitionTokens), 
"updateToScheduled");
     return transactionResult.getCommitTimestamp();
   }
 
@@ -304,7 +316,8 @@ public class PartitionMetadataDao {
    */
   public Timestamp updateToRunning(String partitionToken) {
     final TransactionResult<Void> transactionResult =
-        runInTransaction(transaction -> 
transaction.updateToRunning(partitionToken));
+        runInTransaction(
+            transaction -> transaction.updateToRunning(partitionToken), 
"updateToRunning");
     return transactionResult.getCommitTimestamp();
   }
 
@@ -316,7 +329,8 @@ public class PartitionMetadataDao {
    */
   public Timestamp updateToFinished(String partitionToken) {
     final TransactionResult<Void> transactionResult =
-        runInTransaction(transaction -> 
transaction.updateToFinished(partitionToken));
+        runInTransaction(
+            transaction -> transaction.updateToFinished(partitionToken), 
"updateToFinished");
     return transactionResult.getCommitTimestamp();
   }
 
@@ -327,7 +341,8 @@ public class PartitionMetadataDao {
    * @param watermark the new partition watermark
    */
   public void updateWatermark(String partitionToken, Timestamp watermark) {
-    runInTransaction(transaction -> 
transaction.updateWatermark(partitionToken, watermark));
+    runInTransaction(
+        transaction -> transaction.updateWatermark(partitionToken, watermark), 
"updateWatermark");
   }
 
   /**
@@ -352,6 +367,20 @@ public class PartitionMetadataDao {
     return new TransactionResult<>(result, 
readWriteTransaction.getCommitTimestamp());
   }
 
+  public <T> TransactionResult<T> runInTransaction(
+      Function<InTransactionContext, T> callable, String tagName) {
+    final TransactionRunner readWriteTransaction =
+        databaseClient.readWriteTransaction(Options.tag(tagName));
+    final T result =
+        readWriteTransaction.run(
+            transaction -> {
+              final InTransactionContext transactionContext =
+                  new InTransactionContext(metadataTableName, transaction, 
this.dialect);
+              return callable.apply(transactionContext);
+            });
+    return new TransactionResult<>(result, 
readWriteTransaction.getCommitTimestamp());
+  }
+
   /** Represents the execution of a read / write transaction in Cloud Spanner. 
*/
   public static class InTransactionContext {
     private static final Logger LOG = 
LoggerFactory.getLogger(InTransactionContext.class);
@@ -398,7 +427,8 @@ public class PartitionMetadataDao {
     public Void updateToScheduled(List<String> partitionTokens) {
       HashSet<String> tokens = new HashSet<>();
       Statement statement = getPartitionsMatchingState(partitionTokens, 
State.CREATED);
-      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+      try (ResultSet resultSet =
+          transaction.executeQuery(statement, 
Options.tag("getPartitionsMatchingState=CREATED"))) {
         while (resultSet.next()) {
           tokens.add(resultSet.getString(COLUMN_PARTITION_TOKEN));
         }
@@ -427,7 +457,9 @@ public class PartitionMetadataDao {
       Statement statement =
           
getPartitionsMatchingState(Collections.singletonList(partitionToken), 
State.SCHEDULED);
 
-      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+      try (ResultSet resultSet =
+          transaction.executeQuery(
+              statement, Options.tag("getPartitionsMatchingState=SCHEDULED"))) 
{
         if (!resultSet.next()) {
           LOG.info("[{}] Did not update to be RUNNING", partitionToken);
           return null;
@@ -445,14 +477,6 @@ public class PartitionMetadataDao {
      * @param partitionToken the partition unique identifier
      */
     public Void updateToFinished(String partitionToken) {
-      Statement statement =
-          
getPartitionsMatchingState(Collections.singletonList(partitionToken), 
State.RUNNING);
-      try (ResultSet resultSet = transaction.executeQuery(statement)) {
-        if (!resultSet.next()) {
-          LOG.info("[{}] Did not update to be FINISHED", partitionToken);
-          return null;
-        }
-      }
       LOG.info("[{}] Successfully updating to be FINISHED", partitionToken);
       transaction.buffer(
           
ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, 
State.FINISHED)));
@@ -504,7 +528,9 @@ public class PartitionMetadataDao {
                 .to(partitionToken)
                 .build();
       }
-      try (ResultSet resultSet = transaction.executeQuery(statement)) {
+      try (ResultSet resultSet =
+          transaction.executeQuery(
+              statement, 
Options.tag("getPartitionMetadataRowForGivenPartitionToken"))) {
         if (resultSet.next()) {
           return resultSet.getCurrentRowAsStruct();
         }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
index 78b10368fa6..63efaca0c82 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
@@ -20,6 +20,7 @@ package 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -65,7 +66,8 @@ public class ChildPartitionsRecordActionTest {
     tracker = mock(RestrictionTracker.class);
     watermarkEstimator = mock(ManualWatermarkEstimator.class);
 
-    when(dao.runInTransaction(any())).thenAnswer(new 
TestTransactionAnswer(transaction));
+    when(dao.runInTransaction(any(), anyObject()))
+        .thenAnswer(new TestTransactionAnswer(transaction));
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
index c286ab84aeb..907fdfb3653 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -96,12 +97,13 @@ public class PartitionMetadataDaoTest {
 
   @Test
   public void testInsert() {
+    
when(databaseClient.readWriteTransaction(anyObject())).thenReturn(readWriteTransactionRunner);
     
when(databaseClient.readWriteTransaction()).thenReturn(readWriteTransactionRunner);
     when(readWriteTransactionRunner.run(any())).thenReturn(null);
     when(readWriteTransactionRunner.getCommitTimestamp())
         .thenReturn(Timestamp.ofTimeMicroseconds(1L));
     Timestamp commitTimestamp = partitionMetadataDao.insert(ROW);
-    verify(databaseClient, times(1)).readWriteTransaction();
+    verify(databaseClient, times(1)).readWriteTransaction(anyObject());
     verify(readWriteTransactionRunner, times(1)).run(any());
     verify(readWriteTransactionRunner, times(1)).getCommitTimestamp();
     assertEquals(Timestamp.ofTimeMicroseconds(1L), commitTimestamp);
@@ -143,7 +145,7 @@ public class PartitionMetadataDaoTest {
     ArgumentCaptor<ImmutableList<Mutation>> mutations =
         ArgumentCaptor.forClass(ImmutableList.class);
     ResultSet resultSet = mock(ResultSet.class);
-    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
     when(resultSet.next()).thenReturn(false);
 
     doNothing().when(transaction).buffer(mutations.capture());
@@ -155,7 +157,7 @@ public class PartitionMetadataDaoTest {
   @Test
   public void testInTransactionContextUpdateToRunning() {
     ResultSet resultSet = mock(ResultSet.class);
-    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
     when(resultSet.next()).thenReturn(true);
     when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString());
     
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
@@ -178,7 +180,7 @@ public class PartitionMetadataDaoTest {
   public void testInTransactionContextCannotUpdateToScheduled() {
     System.out.println("Cannot update to scheduled");
     ResultSet resultSet = mock(ResultSet.class);
-    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
     when(resultSet.next()).thenReturn(false);
 
     ArgumentCaptor<ImmutableList<Mutation>> mutations =
@@ -192,7 +194,7 @@ public class PartitionMetadataDaoTest {
   public void testInTransactionContextUpdateToScheduled() {
     System.out.println(" update to scheduled");
     ResultSet resultSet = mock(ResultSet.class);
-    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
     when(resultSet.next()).thenReturn(true).thenReturn(false);
     when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN);
     
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
@@ -211,19 +213,6 @@ public class PartitionMetadataDaoTest {
         
mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString());
   }
 
-  @Test
-  public void testInTransactionContextCannotUpdateToFinished() {
-    System.out.println("Cannot update to finished");
-    ResultSet resultSet = mock(ResultSet.class);
-    when(transaction.executeQuery(any())).thenReturn(resultSet);
-    when(resultSet.next()).thenReturn(false);
-
-    ArgumentCaptor<ImmutableList<Mutation>> mutations =
-        ArgumentCaptor.forClass(ImmutableList.class);
-    assertNull(inTransactionContext.updateToFinished(PARTITION_TOKEN));
-    verify(transaction, times(0)).buffer(mutations.capture());
-  }
-
   @Test
   public void testInTransactionContextUpdateToFinished() {
     System.out.println("update to scheduled");
@@ -263,7 +252,7 @@ public class PartitionMetadataDaoTest {
   @Test
   public void testInTransactionContextGetPartitionWithNoPartitions() {
     ResultSet resultSet = mock(ResultSet.class);
-    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
     when(resultSet.next()).thenReturn(false);
     assertNull(inTransactionContext.getPartition(PARTITION_TOKEN));
   }
@@ -271,7 +260,7 @@ public class PartitionMetadataDaoTest {
   @Test
   public void testInTransactionContextGetPartitionWithPartitions() {
     ResultSet resultSet = mock(ResultSet.class);
-    when(transaction.executeQuery(any())).thenReturn(resultSet);
+    when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet);
     when(resultSet.next()).thenReturn(true);
     
when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build());
     assertNotNull(inTransactionContext.getPartition(PARTITION_TOKEN));

Reply via email to