This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 983d9034b85 Added MetadataSpannerConfig class for generating 
SpannerConfig for accessing change stream metadata database (#25193)
983d9034b85 is described below

commit 983d9034b85b3e0f8dfc2e5103dda9212fe546b6
Author: Doug Judd <nuggetwh...@google.com>
AuthorDate: Fri Feb 10 08:18:14 2023 -0800

    Added MetadataSpannerConfig class for generating SpannerConfig for 
accessing change stream metadata database (#25193)
---
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  |  10 +-
 .../MetadataSpannerConfigFactory.java              | 118 +++++++++++++++++++++
 .../gcp/spanner/changestreams/dao/DaoFactory.java  |   2 +-
 3 files changed, 122 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index b4a451378ce..44cffc201ba 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -72,6 +72,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
@@ -89,7 +90,6 @@ import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -1625,12 +1625,8 @@ public class SpannerIO {
                 .build();
       }
       final SpannerConfig partitionMetadataSpannerConfig =
-          changeStreamSpannerConfig
-              .toBuilder()
-              
.setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
-              
.setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
-              .setDatabaseRole(null)
-              .build();
+          MetadataSpannerConfigFactory.create(
+              changeStreamSpannerConfig, partitionMetadataInstanceId, 
partitionMetadataDatabaseId);
       Dialect changeStreamDatabaseDialect = 
getDialect(changeStreamSpannerConfig);
       Dialect metadataDatabaseDialect = 
getDialect(partitionMetadataSpannerConfig);
       LOG.info(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
new file mode 100644
index 00000000000..83965b1bfaa
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.spanner.Options.RpcPriority;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Duration;
+
+/**
+ * This class generates a SpannerConfig for the change stream metadata 
database by copying only the
+ * necessary fields from the SpannerConfig of the primary database.
+ */
+public class MetadataSpannerConfigFactory {
+
+  /**
+   * Generates a SpannerConfig that can be used to access the change stream 
metadata database by
+   * copying only the necessary fields from the given primary database 
SpannerConfig and setting the
+   * instance ID and database ID to the supplied metadata values.
+   *
+   * @param primaryConfig The SpannerConfig for accessing the primary database
+   * @param metadataInstanceId The instance ID of the metadata database
+   * @param metadataDatabaseId The database ID of the metadata database
+   * @return the metadata SpannerConfig
+   */
+  public static SpannerConfig create(
+      SpannerConfig primaryConfig, String metadataInstanceId, String 
metadataDatabaseId) {
+
+    checkNotNull(
+        metadataInstanceId,
+        "MetadataSpannerConfigFactory.create requires non-null metadata 
instance id");
+    checkNotNull(
+        metadataDatabaseId,
+        "MetadataSpannerConfigFactory.create requires non-null metadata 
database id");
+
+    // NOTE: databaseRole should NOT be copied to the metadata config
+
+    SpannerConfig config =
+        SpannerConfig.create()
+            .withInstanceId(StaticValueProvider.of(metadataInstanceId))
+            .withDatabaseId(StaticValueProvider.of(metadataDatabaseId));
+
+    ValueProvider<String> projectId = primaryConfig.getProjectId();
+    if (projectId != null) {
+      config = config.withProjectId(StaticValueProvider.of(projectId.get()));
+    }
+
+    ValueProvider<String> host = primaryConfig.getHost();
+    if (host != null) {
+      config = config.withHost(StaticValueProvider.of(host.get()));
+    }
+
+    ValueProvider<String> emulatorHost = primaryConfig.getEmulatorHost();
+    if (emulatorHost != null) {
+      config = 
config.withEmulatorHost(StaticValueProvider.of(emulatorHost.get()));
+    }
+
+    ValueProvider<Boolean> isLocalChannelProvider = 
primaryConfig.getIsLocalChannelProvider();
+    if (isLocalChannelProvider != null) {
+      config =
+          
config.withIsLocalChannelProvider(StaticValueProvider.of(isLocalChannelProvider.get()));
+    }
+
+    ValueProvider<Duration> commitDeadline = primaryConfig.getCommitDeadline();
+    if (commitDeadline != null) {
+      config = 
config.withCommitDeadline(StaticValueProvider.of(commitDeadline.get()));
+    }
+
+    ValueProvider<Duration> maxCumulativeBackoff = 
primaryConfig.getMaxCumulativeBackoff();
+    if (maxCumulativeBackoff != null) {
+      config = 
config.withMaxCumulativeBackoff(StaticValueProvider.of(maxCumulativeBackoff.get()));
+    }
+
+    RetrySettings executeStreamingSqlRetrySettings =
+        primaryConfig.getExecuteStreamingSqlRetrySettings();
+    if (executeStreamingSqlRetrySettings != null) {
+      config = 
config.withExecuteStreamingSqlRetrySettings(executeStreamingSqlRetrySettings);
+    }
+
+    RetrySettings commitRetrySettings = primaryConfig.getCommitRetrySettings();
+    if (commitRetrySettings != null) {
+      config = config.withCommitRetrySettings(commitRetrySettings);
+    }
+
+    ImmutableSet<Code> retryableCodes = primaryConfig.getRetryableCodes();
+    if (retryableCodes != null) {
+      config = config.withRetryableCodes(retryableCodes);
+    }
+
+    ValueProvider<RpcPriority> rpcPriority = primaryConfig.getRpcPriority();
+    if (rpcPriority != null) {
+      config = 
config.withRpcPriority(StaticValueProvider.of(rpcPriority.get()));
+    }
+
+    return config;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
index 0b40ddaccad..b9718fdb675 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
@@ -77,7 +77,7 @@ public class DaoFactory implements Serializable {
     }
     this.changeStreamSpannerConfig = changeStreamSpannerConfig;
     this.changeStreamName = changeStreamName;
-    this.metadataSpannerConfig = metadataSpannerConfig.withDatabaseRole(null);
+    this.metadataSpannerConfig = metadataSpannerConfig;
     this.partitionMetadataTableName = partitionMetadataTableName;
     this.rpcPriority = rpcPriority;
     this.jobName = jobName;

Reply via email to