This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 983d9034b85 Added MetadataSpannerConfig class for generating SpannerConfig for accessing change stream metadata database (#25193) 983d9034b85 is described below commit 983d9034b85b3e0f8dfc2e5103dda9212fe546b6 Author: Doug Judd <nuggetwh...@google.com> AuthorDate: Fri Feb 10 08:18:14 2023 -0800 Added MetadataSpannerConfig class for generating SpannerConfig for accessing change stream metadata database (#25193) --- .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 10 +- .../MetadataSpannerConfigFactory.java | 118 +++++++++++++++++++++ .../gcp/spanner/changestreams/dao/DaoFactory.java | 2 +- 3 files changed, 122 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index b4a451378ce..44cffc201ba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -72,6 +72,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn; @@ -89,7 +90,6 @@ import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -1625,12 +1625,8 @@ public class SpannerIO { .build(); } final SpannerConfig partitionMetadataSpannerConfig = - changeStreamSpannerConfig - .toBuilder() - .setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId)) - .setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId)) - .setDatabaseRole(null) - .build(); + MetadataSpannerConfigFactory.create( + changeStreamSpannerConfig, partitionMetadataInstanceId, partitionMetadataDatabaseId); Dialect changeStreamDatabaseDialect = getDialect(changeStreamSpannerConfig); Dialect metadataDatabaseDialect = getDialect(partitionMetadataSpannerConfig); LOG.info( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java new file mode 100644 index 00000000000..83965b1bfaa --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.spanner.Options.RpcPriority; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.joda.time.Duration; + +/** + * This class generates a SpannerConfig for the change stream metadata database by copying only the + * necessary fields from the SpannerConfig of the primary database. + */ +public class MetadataSpannerConfigFactory { + + /** + * Generates a SpannerConfig that can be used to access the change stream metadata database by + * copying only the necessary fields from the given primary database SpannerConfig and setting the + * instance ID and database ID to the supplied metadata values. + * + * @param primaryConfig The SpannerConfig for accessing the primary database + * @param metadataInstanceId The instance ID of the metadata database + * @param metadataDatabaseId The database ID of the metadata database + * @return the metadata SpannerConfig + */ + public static SpannerConfig create( + SpannerConfig primaryConfig, String metadataInstanceId, String metadataDatabaseId) { + + checkNotNull( + metadataInstanceId, + "MetadataSpannerConfigFactory.create requires non-null metadata instance id"); + checkNotNull( + metadataDatabaseId, + "MetadataSpannerConfigFactory.create requires non-null metadata database id"); + + // NOTE: databaseRole should NOT be copied to the metadata config + + SpannerConfig config = + SpannerConfig.create() + .withInstanceId(StaticValueProvider.of(metadataInstanceId)) + .withDatabaseId(StaticValueProvider.of(metadataDatabaseId)); + + ValueProvider<String> projectId = primaryConfig.getProjectId(); + if (projectId != null) { + config = config.withProjectId(StaticValueProvider.of(projectId.get())); + } + + ValueProvider<String> host = primaryConfig.getHost(); + if (host != null) { + config = config.withHost(StaticValueProvider.of(host.get())); + } + + ValueProvider<String> emulatorHost = primaryConfig.getEmulatorHost(); + if (emulatorHost != null) { + config = config.withEmulatorHost(StaticValueProvider.of(emulatorHost.get())); + } + + ValueProvider<Boolean> isLocalChannelProvider = primaryConfig.getIsLocalChannelProvider(); + if (isLocalChannelProvider != null) { + config = + config.withIsLocalChannelProvider(StaticValueProvider.of(isLocalChannelProvider.get())); + } + + ValueProvider<Duration> commitDeadline = primaryConfig.getCommitDeadline(); + if (commitDeadline != null) { + config = config.withCommitDeadline(StaticValueProvider.of(commitDeadline.get())); + } + + ValueProvider<Duration> maxCumulativeBackoff = primaryConfig.getMaxCumulativeBackoff(); + if (maxCumulativeBackoff != null) { + config = config.withMaxCumulativeBackoff(StaticValueProvider.of(maxCumulativeBackoff.get())); + } + + RetrySettings executeStreamingSqlRetrySettings = + primaryConfig.getExecuteStreamingSqlRetrySettings(); + if (executeStreamingSqlRetrySettings != null) { + config = config.withExecuteStreamingSqlRetrySettings(executeStreamingSqlRetrySettings); + } + + RetrySettings commitRetrySettings = primaryConfig.getCommitRetrySettings(); + if (commitRetrySettings != null) { + config = config.withCommitRetrySettings(commitRetrySettings); + } + + ImmutableSet<Code> retryableCodes = primaryConfig.getRetryableCodes(); + if (retryableCodes != null) { + config = config.withRetryableCodes(retryableCodes); + } + + ValueProvider<RpcPriority> rpcPriority = primaryConfig.getRpcPriority(); + if (rpcPriority != null) { + config = config.withRpcPriority(StaticValueProvider.of(rpcPriority.get())); + } + + return config; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java index 0b40ddaccad..b9718fdb675 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java @@ -77,7 +77,7 @@ public class DaoFactory implements Serializable { } this.changeStreamSpannerConfig = changeStreamSpannerConfig; this.changeStreamName = changeStreamName; - this.metadataSpannerConfig = metadataSpannerConfig.withDatabaseRole(null); + this.metadataSpannerConfig = metadataSpannerConfig; this.partitionMetadataTableName = partitionMetadataTableName; this.rpcPriority = rpcPriority; this.jobName = jobName;