This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ac57b8709d3 Optimize change stream connector with more efficient batching and blind writes, and add transaction/query tags (#25718) ac57b8709d3 is described below commit ac57b8709d3f2db597a5d9e6339545b0f2036a3a Author: ChangyuLi28 <35211213+changyul...@users.noreply.github.com> AuthorDate: Fri Mar 17 15:13:27 2023 -0700 Optimize change stream connector with more efficient batching and blind writes, and add transaction/query tags (#25718) * Optimize change stream connector with more efficient batching and blind writes, and add transaction/query tags * Optimize change stream connector with more efficient batching and blind writes, and add transaction/query tags * Apply commit deadline only for metadata table writes --------- Co-authored-by: Changyu Li <changy...@google.com> --- .../MetadataSpannerConfigFactory.java | 2 +- .../action/ChildPartitionsRecordAction.java | 3 +- .../action/DetectNewPartitionsAction.java | 19 ++++-- .../changestreams/dao/PartitionMetadataDao.java | 68 +++++++++++++++------- .../action/ChildPartitionsRecordActionTest.java | 4 +- .../dao/PartitionMetadataDaoTest.java | 29 +++------ 6 files changed, 75 insertions(+), 50 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java index 83965b1bfaa..56c67f3194f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java @@ -84,7 +84,7 @@ public class MetadataSpannerConfigFactory { ValueProvider<Duration> commitDeadline = primaryConfig.getCommitDeadline(); if (commitDeadline != null) { - config = config.withCommitDeadline(StaticValueProvider.of(commitDeadline.get())); + config = config.withCommitDeadline(StaticValueProvider.of(Duration.standardSeconds(60))); } ValueProvider<Duration> maxCumulativeBackoff = primaryConfig.getMaxCumulativeBackoff(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index 70286b41778..7fb69d0e7ab 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -155,7 +155,8 @@ public class ChildPartitionsRecordAction { } else { return false; } - }) + }, + "InsertChildPartition") .getResult(); if (insertedRow && isSplit) { metrics.incPartitionRecordSplitCount(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java index 934210250f5..73967d2a2a7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java @@ -146,15 +146,22 @@ public class DetectNewPartitionsAction { OutputReceiver<PartitionMetadata> receiver, Timestamp minWatermark, TreeMap<Timestamp, List<PartitionMetadata>> batches) { + List<PartitionMetadata> batchPartitionsDifferentCreatedAt = new ArrayList<>(); + int numTimestampsHandledSofar = 0; for (Map.Entry<Timestamp, List<PartitionMetadata>> batch : batches.entrySet()) { + numTimestampsHandledSofar++; final Timestamp batchCreatedAt = batch.getKey(); - final List<PartitionMetadata> batchPartitions = batch.getValue(); - - final Timestamp scheduledAt = updateBatchToScheduled(batchPartitions); - if (!tracker.tryClaim(batchCreatedAt)) { - return ProcessContinuation.stop(); + final List<PartitionMetadata> batchPartitionsSameCreatedAt = batch.getValue(); + batchPartitionsDifferentCreatedAt.addAll(batchPartitionsSameCreatedAt); + if (batchPartitionsDifferentCreatedAt.size() >= 200 + || numTimestampsHandledSofar == batches.size()) { + final Timestamp scheduledAt = updateBatchToScheduled(batchPartitionsDifferentCreatedAt); + if (!tracker.tryClaim(batchCreatedAt)) { + return ProcessContinuation.stop(); + } + outputBatch(receiver, minWatermark, batchPartitionsDifferentCreatedAt, scheduledAt); + batchPartitionsDifferentCreatedAt = new ArrayList<>(); } - outputBatch(receiver, minWatermark, batchPartitions, scheduledAt); } return ProcessContinuation.resume().withResumeDelay(resumeDuration); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java index 6dc0e7a580d..bde04bb1dec 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java @@ -33,6 +33,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.Dialect; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; @@ -89,7 +90,8 @@ public class PartitionMetadataDao { try (ResultSet queryResultSet = databaseClient .singleUseReadOnlyTransaction() - .executeQuery(Statement.of(checkTableExistsStmt))) { + .executeQuery( + Statement.of(checkTableExistsStmt), Options.tag("query=checkTableExists"))) { return queryResultSet.next(); } } @@ -126,7 +128,8 @@ public class PartitionMetadataDao { .to(partitionToken) .build(); } - try (ResultSet resultSet = databaseClient.singleUse().executeQuery(statement)) { + try (ResultSet resultSet = + databaseClient.singleUse().executeQuery(statement, Options.tag("query=getPartition"))) { if (resultSet.next()) { return resultSet.getCurrentRowAsStruct(); } @@ -175,7 +178,10 @@ public class PartitionMetadataDao { .to(State.FINISHED.name()) .build(); } - try (ResultSet resultSet = databaseClient.singleUse().executeQuery(statement)) { + try (ResultSet resultSet = + databaseClient + .singleUse() + .executeQuery(statement, Options.tag("query=getUnfinishedMinWatermark"))) { if (resultSet.next()) { return resultSet.getTimestamp(COLUMN_WATERMARK); } @@ -226,7 +232,9 @@ public class PartitionMetadataDao { .to(timestamp) .build(); } - return databaseClient.singleUse().executeQuery(statement); + return databaseClient + .singleUse() + .executeQuery(statement, Options.tag("query=getAllPartitionsCreatedAfter")); } /** @@ -259,7 +267,10 @@ public class PartitionMetadataDao { .build(); } - try (ResultSet resultSet = databaseClient.singleUse().executeQuery(statement)) { + try (ResultSet resultSet = + databaseClient + .singleUse() + .executeQuery(statement, Options.tag("query=countPartitionsCreatedAfter"))) { if (resultSet.next()) { return resultSet.getLong("count"); } else { @@ -280,7 +291,7 @@ public class PartitionMetadataDao { */ public Timestamp insert(PartitionMetadata row) { final TransactionResult<Void> transactionResult = - runInTransaction(transaction -> transaction.insert(row)); + runInTransaction(transaction -> transaction.insert(row), "InsertsPartitionMetadata"); return transactionResult.getCommitTimestamp(); } @@ -292,7 +303,8 @@ public class PartitionMetadataDao { */ public Timestamp updateToScheduled(List<String> partitionTokens) { final TransactionResult<Void> transactionResult = - runInTransaction(transaction -> transaction.updateToScheduled(partitionTokens)); + runInTransaction( + transaction -> transaction.updateToScheduled(partitionTokens), "updateToScheduled"); return transactionResult.getCommitTimestamp(); } @@ -304,7 +316,8 @@ public class PartitionMetadataDao { */ public Timestamp updateToRunning(String partitionToken) { final TransactionResult<Void> transactionResult = - runInTransaction(transaction -> transaction.updateToRunning(partitionToken)); + runInTransaction( + transaction -> transaction.updateToRunning(partitionToken), "updateToRunning"); return transactionResult.getCommitTimestamp(); } @@ -316,7 +329,8 @@ public class PartitionMetadataDao { */ public Timestamp updateToFinished(String partitionToken) { final TransactionResult<Void> transactionResult = - runInTransaction(transaction -> transaction.updateToFinished(partitionToken)); + runInTransaction( + transaction -> transaction.updateToFinished(partitionToken), "updateToFinished"); return transactionResult.getCommitTimestamp(); } @@ -327,7 +341,8 @@ public class PartitionMetadataDao { * @param watermark the new partition watermark */ public void updateWatermark(String partitionToken, Timestamp watermark) { - runInTransaction(transaction -> transaction.updateWatermark(partitionToken, watermark)); + runInTransaction( + transaction -> transaction.updateWatermark(partitionToken, watermark), "updateWatermark"); } /** @@ -352,6 +367,20 @@ public class PartitionMetadataDao { return new TransactionResult<>(result, readWriteTransaction.getCommitTimestamp()); } + public <T> TransactionResult<T> runInTransaction( + Function<InTransactionContext, T> callable, String tagName) { + final TransactionRunner readWriteTransaction = + databaseClient.readWriteTransaction(Options.tag(tagName)); + final T result = + readWriteTransaction.run( + transaction -> { + final InTransactionContext transactionContext = + new InTransactionContext(metadataTableName, transaction, this.dialect); + return callable.apply(transactionContext); + }); + return new TransactionResult<>(result, readWriteTransaction.getCommitTimestamp()); + } + /** Represents the execution of a read / write transaction in Cloud Spanner. */ public static class InTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(InTransactionContext.class); @@ -398,7 +427,8 @@ public class PartitionMetadataDao { public Void updateToScheduled(List<String> partitionTokens) { HashSet<String> tokens = new HashSet<>(); Statement statement = getPartitionsMatchingState(partitionTokens, State.CREATED); - try (ResultSet resultSet = transaction.executeQuery(statement)) { + try (ResultSet resultSet = + transaction.executeQuery(statement, Options.tag("getPartitionsMatchingState=CREATED"))) { while (resultSet.next()) { tokens.add(resultSet.getString(COLUMN_PARTITION_TOKEN)); } @@ -427,7 +457,9 @@ public class PartitionMetadataDao { Statement statement = getPartitionsMatchingState(Collections.singletonList(partitionToken), State.SCHEDULED); - try (ResultSet resultSet = transaction.executeQuery(statement)) { + try (ResultSet resultSet = + transaction.executeQuery( + statement, Options.tag("getPartitionsMatchingState=SCHEDULED"))) { if (!resultSet.next()) { LOG.info("[{}] Did not update to be RUNNING", partitionToken); return null; @@ -445,14 +477,6 @@ public class PartitionMetadataDao { * @param partitionToken the partition unique identifier */ public Void updateToFinished(String partitionToken) { - Statement statement = - getPartitionsMatchingState(Collections.singletonList(partitionToken), State.RUNNING); - try (ResultSet resultSet = transaction.executeQuery(statement)) { - if (!resultSet.next()) { - LOG.info("[{}] Did not update to be FINISHED", partitionToken); - return null; - } - } LOG.info("[{}] Successfully updating to be FINISHED", partitionToken); transaction.buffer( ImmutableList.of(createUpdateMetadataStateMutationFrom(partitionToken, State.FINISHED))); @@ -504,7 +528,9 @@ public class PartitionMetadataDao { .to(partitionToken) .build(); } - try (ResultSet resultSet = transaction.executeQuery(statement)) { + try (ResultSet resultSet = + transaction.executeQuery( + statement, Options.tag("getPartitionMetadataRowForGivenPartitionToken"))) { if (resultSet.next()) { return resultSet.getCurrentRowAsStruct(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java index 78b10368fa6..63efaca0c82 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.action; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -65,7 +66,8 @@ public class ChildPartitionsRecordActionTest { tracker = mock(RestrictionTracker.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); - when(dao.runInTransaction(any())).thenAnswer(new TestTransactionAnswer(transaction)); + when(dao.runInTransaction(any(), anyObject())) + .thenAnswer(new TestTransactionAnswer(transaction)); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java index c286ab84aeb..907fdfb3653 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDaoTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -96,12 +97,13 @@ public class PartitionMetadataDaoTest { @Test public void testInsert() { + when(databaseClient.readWriteTransaction(anyObject())).thenReturn(readWriteTransactionRunner); when(databaseClient.readWriteTransaction()).thenReturn(readWriteTransactionRunner); when(readWriteTransactionRunner.run(any())).thenReturn(null); when(readWriteTransactionRunner.getCommitTimestamp()) .thenReturn(Timestamp.ofTimeMicroseconds(1L)); Timestamp commitTimestamp = partitionMetadataDao.insert(ROW); - verify(databaseClient, times(1)).readWriteTransaction(); + verify(databaseClient, times(1)).readWriteTransaction(anyObject()); verify(readWriteTransactionRunner, times(1)).run(any()); verify(readWriteTransactionRunner, times(1)).getCommitTimestamp(); assertEquals(Timestamp.ofTimeMicroseconds(1L), commitTimestamp); @@ -143,7 +145,7 @@ public class PartitionMetadataDaoTest { ArgumentCaptor<ImmutableList<Mutation>> mutations = ArgumentCaptor.forClass(ImmutableList.class); ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any())).thenReturn(resultSet); + when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); when(resultSet.next()).thenReturn(false); doNothing().when(transaction).buffer(mutations.capture()); @@ -155,7 +157,7 @@ public class PartitionMetadataDaoTest { @Test public void testInTransactionContextUpdateToRunning() { ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any())).thenReturn(resultSet); + when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); when(resultSet.next()).thenReturn(true); when(resultSet.getString(any())).thenReturn(State.SCHEDULED.toString()); when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build()); @@ -178,7 +180,7 @@ public class PartitionMetadataDaoTest { public void testInTransactionContextCannotUpdateToScheduled() { System.out.println("Cannot update to scheduled"); ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any())).thenReturn(resultSet); + when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); when(resultSet.next()).thenReturn(false); ArgumentCaptor<ImmutableList<Mutation>> mutations = @@ -192,7 +194,7 @@ public class PartitionMetadataDaoTest { public void testInTransactionContextUpdateToScheduled() { System.out.println(" update to scheduled"); ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any())).thenReturn(resultSet); + when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); when(resultSet.next()).thenReturn(true).thenReturn(false); when(resultSet.getString(any())).thenReturn(PARTITION_TOKEN); when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build()); @@ -211,19 +213,6 @@ public class PartitionMetadataDaoTest { mutationValueMap.get(PartitionMetadataAdminDao.COLUMN_STATE).getString()); } - @Test - public void testInTransactionContextCannotUpdateToFinished() { - System.out.println("Cannot update to finished"); - ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any())).thenReturn(resultSet); - when(resultSet.next()).thenReturn(false); - - ArgumentCaptor<ImmutableList<Mutation>> mutations = - ArgumentCaptor.forClass(ImmutableList.class); - assertNull(inTransactionContext.updateToFinished(PARTITION_TOKEN)); - verify(transaction, times(0)).buffer(mutations.capture()); - } - @Test public void testInTransactionContextUpdateToFinished() { System.out.println("update to scheduled"); @@ -263,7 +252,7 @@ public class PartitionMetadataDaoTest { @Test public void testInTransactionContextGetPartitionWithNoPartitions() { ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any())).thenReturn(resultSet); + when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); when(resultSet.next()).thenReturn(false); assertNull(inTransactionContext.getPartition(PARTITION_TOKEN)); } @@ -271,7 +260,7 @@ public class PartitionMetadataDaoTest { @Test public void testInTransactionContextGetPartitionWithPartitions() { ResultSet resultSet = mock(ResultSet.class); - when(transaction.executeQuery(any())).thenReturn(resultSet); + when(transaction.executeQuery(any(), anyObject())).thenReturn(resultSet); when(resultSet.next()).thenReturn(true); when(resultSet.getCurrentRowAsStruct()).thenReturn(Struct.newBuilder().build()); assertNotNull(inTransactionContext.getPartition(PARTITION_TOKEN));