Polber commented on code in PR #31987:
URL: https://github.com/apache/beam/pull/31987#discussion_r1729450818


##########
sdks/python/apache_beam/io/gcp/spanner_wrapper.py:
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import uuid
+import apache_beam as beam
+
+try:
+  from google.cloud import spanner
+
+except ImportError:
+  spanner = None
+
+_LOGGER = logging.getLogger(__name__)

Review Comment:
   ```suggestion
   _LOGGER = logging.getLogger(__name__)
   MAX_RETRIES = 3
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,211 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+/**
+ * A provider for reading from Cloud Spanner using a Schema Transform Provider.
+ *
+ * <p>This provider enables reading from Cloud Spanner using a specified SQL 
query or by
+ * directly accessing a table and its columns. It supports configuration 
through the
+ * {@link SpannerReadSchemaTransformConfiguration} class, allowing users to 
specify
+ * project, instance, database, table, query, and columns.
+ *
+ * <p>The transformation leverages the {@link SpannerIO} to perform the read 
operation
+ * and maps the results to Beam rows, preserving the schema.
+ *
+ * <p>Example usage in a YAML pipeline using query:
+ *
+ * <pre>{@code
+ * pipeline:
+ *   transforms:
+ *     - type: ReadFromSpanner
+ *       name: ReadShipments
+ *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, 
customer_name, customer_email
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         query: 'SELECT * FROM shipments'
+ * }</pre>
+ * 
+ * <p>Example usage in a YAML pipeline using a table and columns:
+ *
+ * <pre>{@code
+ * pipeline:
+ *   transforms:
+ *     - type: ReadFromSpanner
+ *       name: ReadShipments
+ *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, 
customer_name, customer_email
+ *       config:
+ *         project_id: 'apache-beam-testing'
+ *         instance_id: 'shipment-test'
+ *         database_id: 'shipment'
+ *         table: 'shipments'
+ *         columns: ['customer_id', 'customer_name']
+ * }</pre>
+ */
+
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+  static class SpannerSchemaTransformRead extends SchemaTransform implements 
Serializable {
+    private final SpannerReadSchemaTransformConfiguration configuration;
+
+    SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be 
null.");
+      SpannerIO.Read read = SpannerIO
+                            .readWithSchema()
+                            .withProjectId(configuration.getProjectId())
+                            .withInstanceId(configuration.getInstanceId())
+                            .withDatabaseId(configuration.getDatabaseId());
+
+      if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+        read = read.withQuery(configuration.getQuery());
+      } 
+      else {
+        read = read.withTable(configuration.getTableId())
+                   .withColumns(configuration.getColumns());
+      }
+      PCollection<Struct> spannerRows = input.getPipeline().apply(read);
+      Schema schema = spannerRows.getSchema();
+      PCollection<Row> rows = 
spannerRows.apply(MapElements.into(TypeDescriptor.of(Row.class))
+          .via((Struct struct) -> StructUtils.structToBeamRow(struct, 
schema)));
+
+          return PCollectionRowTuple.of("output", rows.setRowSchema(schema));
+    }
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized String identifier() {
+    return "beam:schematransform:org.apache.beam:spanner_read:v1";
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      outputCollectionNames() {
+    return Collections.singletonList("output");
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class SpannerReadSchemaTransformConfiguration 
implements Serializable {
+    @AutoValue.Builder
+    @Nullable
+    public abstract static class Builder {
+      public abstract Builder setProjectId(String projectId);
+      public abstract Builder setInstanceId(String instanceId);
+      public abstract Builder setDatabaseId(String databaseId);
+      public abstract Builder setTableId(String tableId);
+      public abstract Builder setQuery(String query);
+      public abstract Builder setColumns(List<String> columns);
+      public abstract SpannerReadSchemaTransformConfiguration build();
+    }
+
+    public void validate() {
+      String invalidConfigMessage = "Invalid Cloud Spanner Read configuration: 
";
+      checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID 
must be specified for SQL query.");
+      checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID 
must be specified for SQL query.");
+      checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID 
must be specified for SQL query.");
+
+      if (Strings.isNullOrEmpty(this.getQuery())) {
+        checkNotNull(this.getTableId(), invalidConfigMessage + "Table name 
must be specified for table read.");
+        checkNotNull(this.getColumns(), invalidConfigMessage + "Columns must 
be specified for table read.");
+      }
+      else {
+        checkNotNull(this.getQuery(), invalidConfigMessage + "Query must be 
specified for query read.");
+        checkArgument(
+          this.getTableId() == null,

Review Comment:
   Let's use `isNullOrEmpty` to be safe
   ```suggestion
             Strings.isNullOrEmpty(this.getQuery()),
   ```
   and same below for columns



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java:
##########


Review Comment:
   I don't think this file needs to be modified



##########
sdks/python/apache_beam/io/gcp/spanner_wrapper.py:
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import uuid
+import apache_beam as beam
+
+try:
+  from google.cloud import spanner
+
+except ImportError:
+  spanner = None
+
+_LOGGER = logging.getLogger(__name__)
+
+class SpannerWrapper(object):
+  TEST_DATABASE = None
+  TEMP_DATABASE_PREFIX = 'temp-'
+  _SPANNER_CLIENT = None
+  _SPANNER_INSTANCE = None
+
+  def __init__(self, project_id = "apache-beam-testing", temp_database_id = 
None):
+    self._SPANNER_CLIENT = spanner.Client(project = project_id)
+    self._SPANNER_INSTANCE = self._SPANNER_CLIENT.instance("beam-test")
+    self.TEST_DATABASE = None

Review Comment:
   Refactoring their usages in the rest of the class accordingly 
   ```suggestion
       self._spanner_client = spanner.Client(project = project_id)
       self._spanner_instance = self._spanner_client.instance("beam-test")
       self._test_database = None
   ```



##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -37,14 +37,33 @@
 from apache_beam.utils import python_callable
 from apache_beam.yaml import yaml_provider
 from apache_beam.yaml import yaml_transform
-
+from apache_beam.io.gcp.spanner_wrapper import SpannerWrapper
 
 @contextlib.contextmanager
 def gcs_temp_dir(bucket):
   gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4())
   yield gcs_tempdir
   filesystems.FileSystems.delete([gcs_tempdir])
 
+import contextlib
+import uuid
+import logging
+from google.cloud import spanner
+
[email protected]
+def temp_spanner_table(project, prefix = 'temp_spanner_db_'):
+    spanner_client = SpannerWrapper(project)
+    spanner_client._create_database()
+    instance = 'beam-test'
+    database = spanner_client.TEST_DATABASE
+    table = 'Users'
+    columns = ['UserId', 'Key']

Review Comment:
   Can we make this configurable and define columns in the test fixture on the 
YAML file?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java:
##########
@@ -351,4 +357,59 @@ private static void addIterableToMutationBuilder(
                 beamIterableType.getTypeName()));
     }
   }
+  public static Row createRowFromMutation(Schema schema, Mutation mutation) {
+    HashMap <String, Object> mutationHashMap = new HashMap<String, Object>();
+    mutation.asMap().forEach(
+        (column, value) -> mutationHashMap.put(column, 
convertValueToBeamFieldType(value)));
+    Map<String, Object> mutationMap = new HashMap<String, 
Object>(mutationHashMap);
+    return Row.withSchema(schema).withFieldValues(mutationMap).build();

Review Comment:
   No need to construct map twice
   ```suggestion
       Map<String, Object> mutationHashMap = new HashMap<>();
       mutation.asMap().forEach(
           (column, value) -> mutationHashMap.put(column, 
convertValueToBeamFieldType(value)));
       return Row.withSchema(schema).withFieldValues(mutationHashMap).build();
   ```



##########
sdks/python/apache_beam/io/gcp/spanner_wrapper.py:
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import uuid
+import apache_beam as beam
+
+try:
+  from google.cloud import spanner
+
+except ImportError:
+  spanner = None
+
+_LOGGER = logging.getLogger(__name__)
+
+class SpannerWrapper(object):
+  TEST_DATABASE = None
+  TEMP_DATABASE_PREFIX = 'temp-'
+  _SPANNER_CLIENT = None
+  _SPANNER_INSTANCE = None
+
+  def __init__(self, project_id = "apache-beam-testing", temp_database_id = 
None):

Review Comment:
   Let's not default this value
   ```suggestion
     def __init__(self, project_id, temp_database_id = None):
   ```



##########
sdks/python/apache_beam/io/gcp/spanner_wrapper.py:
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import uuid
+import apache_beam as beam
+
+try:
+  from google.cloud import spanner
+
+except ImportError:
+  spanner = None
+
+_LOGGER = logging.getLogger(__name__)
+
+class SpannerWrapper(object):
+  TEST_DATABASE = None
+  TEMP_DATABASE_PREFIX = 'temp-'
+  _SPANNER_CLIENT = None
+  _SPANNER_INSTANCE = None

Review Comment:
   I have no idea why they wrote the BQ wrapper this way, but these don't need 
to be class variables as they are state variables (except 
`TEMP_DATABASE_PREFIX`, I suppose) I would remove at least the state variables 
and move to constructor as instance variables.
   ```suggestion
     TEMP_DATABASE_PREFIX = 'temp-'
   ```



##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -37,14 +37,33 @@
 from apache_beam.utils import python_callable
 from apache_beam.yaml import yaml_provider
 from apache_beam.yaml import yaml_transform
-
+from apache_beam.io.gcp.spanner_wrapper import SpannerWrapper
 
 @contextlib.contextmanager
 def gcs_temp_dir(bucket):
   gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4())
   yield gcs_tempdir
   filesystems.FileSystems.delete([gcs_tempdir])
 
+import contextlib
+import uuid
+import logging
+from google.cloud import spanner
+
[email protected]
+def temp_spanner_table(project, prefix = 'temp_spanner_db_'):
+    spanner_client = SpannerWrapper(project)
+    spanner_client._create_database()
+    instance = 'beam-test'
+    database = spanner_client.TEST_DATABASE
+    table = 'Users'

Review Comment:
   Let's stay consistent with BQ and name this `tmp_table`



##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -37,14 +37,33 @@
 from apache_beam.utils import python_callable
 from apache_beam.yaml import yaml_provider
 from apache_beam.yaml import yaml_transform
-
+from apache_beam.io.gcp.spanner_wrapper import SpannerWrapper
 
 @contextlib.contextmanager
 def gcs_temp_dir(bucket):
   gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4())
   yield gcs_tempdir
   filesystems.FileSystems.delete([gcs_tempdir])
 
+import contextlib
+import uuid
+import logging
+from google.cloud import spanner
+
[email protected]
+def temp_spanner_table(project, prefix = 'temp_spanner_db_'):
+    spanner_client = SpannerWrapper(project)
+    spanner_client._create_database()
+    instance = 'beam-test'

Review Comment:
   This could lead to a collision if multiple PR's are running this test. Let's 
add a UUID to avoid naming collisions
   ```suggestion
       instance = '%s_%s' % (prefix, uuid.uuid4().hex)
   ```
   (I don't think the prefix needs the trailing underscore in the method 
definition, but sure why BigQuery's context manager has it either)



##########
sdks/python/apache_beam/io/gcp/spanner_wrapper.py:
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import uuid
+import apache_beam as beam
+
+try:
+  from google.cloud import spanner
+
+except ImportError:
+  spanner = None
+
+_LOGGER = logging.getLogger(__name__)
+
+class SpannerWrapper(object):
+  TEST_DATABASE = None
+  TEMP_DATABASE_PREFIX = 'temp-'
+  _SPANNER_CLIENT = None
+  _SPANNER_INSTANCE = None
+
+  def __init__(self, project_id = "apache-beam-testing", temp_database_id = 
None):
+    self._SPANNER_CLIENT = spanner.Client(project = project_id)
+    self._SPANNER_INSTANCE = self._SPANNER_CLIENT.instance("beam-test")
+    self.TEST_DATABASE = None
+
+    if temp_database_id and 
temp_database_id.startswith(self.TEMP_DATABASE_PREFIX):
+      raise ValueError(
+        'User provided temp database ID cannot start with %r' %
+         self.TEMP_DATABASE_PREFIX)
+
+    if temp_database_id is not None:
+      self.TEST_DATABASE = temp_database_id
+    else:
+      self.TEST_DATABASE = self._get_temp_database()
+
+  def _get_temp_database(self):
+        uniq_id = uuid.uuid4().hex[:10]
+        return f'{self.TEMP_DATABASE_PREFIX}{uniq_id}'
+
+  def _create_database(self):

Review Comment:
   ```suggestion
     @retry.with_exponential_backoff(
         num_retries=MAX_RETRIES,
         retry_filter=retry.retry_on_server_errors_and_timeout_filter)
     def _create_database(self):
   ```



##########
sdks/python/apache_beam/io/gcp/spanner_wrapper.py:
##########
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import uuid
+import apache_beam as beam
+
+try:
+  from google.cloud import spanner
+
+except ImportError:
+  spanner = None
+
+_LOGGER = logging.getLogger(__name__)
+
+class SpannerWrapper(object):
+  TEST_DATABASE = None
+  TEMP_DATABASE_PREFIX = 'temp-'
+  _SPANNER_CLIENT = None
+  _SPANNER_INSTANCE = None
+
+  def __init__(self, project_id = "apache-beam-testing", temp_database_id = 
None):
+    self._SPANNER_CLIENT = spanner.Client(project = project_id)
+    self._SPANNER_INSTANCE = self._SPANNER_CLIENT.instance("beam-test")
+    self.TEST_DATABASE = None
+
+    if temp_database_id and 
temp_database_id.startswith(self.TEMP_DATABASE_PREFIX):
+      raise ValueError(
+        'User provided temp database ID cannot start with %r' %
+         self.TEMP_DATABASE_PREFIX)
+
+    if temp_database_id is not None:
+      self.TEST_DATABASE = temp_database_id
+    else:
+      self.TEST_DATABASE = self._get_temp_database()
+
+  def _get_temp_database(self):
+        uniq_id = uuid.uuid4().hex[:10]
+        return f'{self.TEMP_DATABASE_PREFIX}{uniq_id}'
+
+  def _create_database(self):
+    _LOGGER.info('Creating test database: %s' % self.TEST_DATABASE)
+    instance = self._SPANNER_INSTANCE
+    database = instance.database(
+        self.TEST_DATABASE,
+        ddl_statements = [
+            '''CREATE TABLE Users (
+            UserId    STRING(256) NOT NULL,
+            Key       STRING(1024)
+        ) PRIMARY KEY (UserId)'''
+        ])
+    operation = database.create()
+    _LOGGER.info('Creating database: Done! %s' % str(operation.result()))
+
+  @classmethod

Review Comment:
   This isn't truly acting as a class method as it is using state of object 
(class methods are useful for methods that are used independently from the 
state of the class, for example a method that returns every created table 
across all instances of the object)
   ```suggestion
   ```
   
   and for future reference, a classmethod follows the standard 
   ```
   @classmethod
   def some_method(cls):
     return cls.some_var
   ```
   using `cls` instead of `self` to access class variables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to