Polber commented on code in PR #31987: URL: https://github.com/apache/beam/pull/31987#discussion_r1729450818
########## sdks/python/apache_beam/io/gcp/spanner_wrapper.py: ########## @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import uuid +import apache_beam as beam + +try: + from google.cloud import spanner + +except ImportError: + spanner = None + +_LOGGER = logging.getLogger(__name__) Review Comment: ```suggestion _LOGGER = logging.getLogger(__name__) MAX_RETRIES = 3 ``` ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java: ########## @@ -0,0 +1,211 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.io.gcp.spanner.StructUtils; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.Type; +import javax.annotation.Nullable; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/** + * A provider for reading from Cloud Spanner using a Schema Transform Provider. + * + * <p>This provider enables reading from Cloud Spanner using a specified SQL query or by + * directly accessing a table and its columns. It supports configuration through the + * {@link SpannerReadSchemaTransformConfiguration} class, allowing users to specify + * project, instance, database, table, query, and columns. + * + * <p>The transformation leverages the {@link SpannerIO} to perform the read operation + * and maps the results to Beam rows, preserving the schema. + * + * <p>Example usage in a YAML pipeline using query: + * + * <pre>{@code + * pipeline: + * transforms: + * - type: ReadFromSpanner + * name: ReadShipments + * # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email + * config: + * project_id: 'apache-beam-testing' + * instance_id: 'shipment-test' + * database_id: 'shipment' + * query: 'SELECT * FROM shipments' + * }</pre> + * + * <p>Example usage in a YAML pipeline using a table and columns: + * + * <pre>{@code + * pipeline: + * transforms: + * - type: ReadFromSpanner + * name: ReadShipments + * # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email + * config: + * project_id: 'apache-beam-testing' + * instance_id: 'shipment-test' + * database_id: 'shipment' + * table: 'shipments' + * columns: ['customer_id', 'customer_name'] + * }</pre> + */ + +@AutoService(SchemaTransformProvider.class) +public class SpannerReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> { + + static class SpannerSchemaTransformRead extends SchemaTransform implements Serializable { + private final SpannerReadSchemaTransformConfiguration configuration; + + SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be null."); + SpannerIO.Read read = SpannerIO + .readWithSchema() + .withProjectId(configuration.getProjectId()) + .withInstanceId(configuration.getInstanceId()) + .withDatabaseId(configuration.getDatabaseId()); + + if (!Strings.isNullOrEmpty(configuration.getQuery())) { + read = read.withQuery(configuration.getQuery()); + } + else { + read = read.withTable(configuration.getTableId()) + .withColumns(configuration.getColumns()); + } + PCollection<Struct> spannerRows = input.getPipeline().apply(read); + Schema schema = spannerRows.getSchema(); + PCollection<Row> rows = spannerRows.apply(MapElements.into(TypeDescriptor.of(Row.class)) + .via((Struct struct) -> StructUtils.structToBeamRow(struct, schema))); + + return PCollectionRowTuple.of("output", rows.setRowSchema(schema)); + } + } + + @Override + public @UnknownKeyFor @NonNull @Initialized String identifier() { + return "beam:schematransform:org.apache.beam:spanner_read:v1"; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> + outputCollectionNames() { + return Collections.singletonList("output"); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class SpannerReadSchemaTransformConfiguration implements Serializable { + @AutoValue.Builder + @Nullable + public abstract static class Builder { + public abstract Builder setProjectId(String projectId); + public abstract Builder setInstanceId(String instanceId); + public abstract Builder setDatabaseId(String databaseId); + public abstract Builder setTableId(String tableId); + public abstract Builder setQuery(String query); + public abstract Builder setColumns(List<String> columns); + public abstract SpannerReadSchemaTransformConfiguration build(); + } + + public void validate() { + String invalidConfigMessage = "Invalid Cloud Spanner Read configuration: "; + checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID must be specified for SQL query."); + checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID must be specified for SQL query."); + checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID must be specified for SQL query."); + + if (Strings.isNullOrEmpty(this.getQuery())) { + checkNotNull(this.getTableId(), invalidConfigMessage + "Table name must be specified for table read."); + checkNotNull(this.getColumns(), invalidConfigMessage + "Columns must be specified for table read."); + } + else { + checkNotNull(this.getQuery(), invalidConfigMessage + "Query must be specified for query read."); + checkArgument( + this.getTableId() == null, Review Comment: Let's use `isNullOrEmpty` to be safe ```suggestion Strings.isNullOrEmpty(this.getQuery()), ``` and same below for columns ########## sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java: ########## Review Comment: I don't think this file needs to be modified ########## sdks/python/apache_beam/io/gcp/spanner_wrapper.py: ########## @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import uuid +import apache_beam as beam + +try: + from google.cloud import spanner + +except ImportError: + spanner = None + +_LOGGER = logging.getLogger(__name__) + +class SpannerWrapper(object): + TEST_DATABASE = None + TEMP_DATABASE_PREFIX = 'temp-' + _SPANNER_CLIENT = None + _SPANNER_INSTANCE = None + + def __init__(self, project_id = "apache-beam-testing", temp_database_id = None): + self._SPANNER_CLIENT = spanner.Client(project = project_id) + self._SPANNER_INSTANCE = self._SPANNER_CLIENT.instance("beam-test") + self.TEST_DATABASE = None Review Comment: Refactoring their usages in the rest of the class accordingly ```suggestion self._spanner_client = spanner.Client(project = project_id) self._spanner_instance = self._spanner_client.instance("beam-test") self._test_database = None ``` ########## sdks/python/apache_beam/yaml/integration_tests.py: ########## @@ -37,14 +37,33 @@ from apache_beam.utils import python_callable from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_transform - +from apache_beam.io.gcp.spanner_wrapper import SpannerWrapper @contextlib.contextmanager def gcs_temp_dir(bucket): gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4()) yield gcs_tempdir filesystems.FileSystems.delete([gcs_tempdir]) +import contextlib +import uuid +import logging +from google.cloud import spanner + [email protected] +def temp_spanner_table(project, prefix = 'temp_spanner_db_'): + spanner_client = SpannerWrapper(project) + spanner_client._create_database() + instance = 'beam-test' + database = spanner_client.TEST_DATABASE + table = 'Users' + columns = ['UserId', 'Key'] Review Comment: Can we make this configurable and define columns in the test fixture on the YAML file? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java: ########## @@ -351,4 +357,59 @@ private static void addIterableToMutationBuilder( beamIterableType.getTypeName())); } } + public static Row createRowFromMutation(Schema schema, Mutation mutation) { + HashMap <String, Object> mutationHashMap = new HashMap<String, Object>(); + mutation.asMap().forEach( + (column, value) -> mutationHashMap.put(column, convertValueToBeamFieldType(value))); + Map<String, Object> mutationMap = new HashMap<String, Object>(mutationHashMap); + return Row.withSchema(schema).withFieldValues(mutationMap).build(); Review Comment: No need to construct map twice ```suggestion Map<String, Object> mutationHashMap = new HashMap<>(); mutation.asMap().forEach( (column, value) -> mutationHashMap.put(column, convertValueToBeamFieldType(value))); return Row.withSchema(schema).withFieldValues(mutationHashMap).build(); ``` ########## sdks/python/apache_beam/io/gcp/spanner_wrapper.py: ########## @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import uuid +import apache_beam as beam + +try: + from google.cloud import spanner + +except ImportError: + spanner = None + +_LOGGER = logging.getLogger(__name__) + +class SpannerWrapper(object): + TEST_DATABASE = None + TEMP_DATABASE_PREFIX = 'temp-' + _SPANNER_CLIENT = None + _SPANNER_INSTANCE = None + + def __init__(self, project_id = "apache-beam-testing", temp_database_id = None): Review Comment: Let's not default this value ```suggestion def __init__(self, project_id, temp_database_id = None): ``` ########## sdks/python/apache_beam/io/gcp/spanner_wrapper.py: ########## @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import uuid +import apache_beam as beam + +try: + from google.cloud import spanner + +except ImportError: + spanner = None + +_LOGGER = logging.getLogger(__name__) + +class SpannerWrapper(object): + TEST_DATABASE = None + TEMP_DATABASE_PREFIX = 'temp-' + _SPANNER_CLIENT = None + _SPANNER_INSTANCE = None Review Comment: I have no idea why they wrote the BQ wrapper this way, but these don't need to be class variables as they are state variables (except `TEMP_DATABASE_PREFIX`, I suppose) I would remove at least the state variables and move to constructor as instance variables. ```suggestion TEMP_DATABASE_PREFIX = 'temp-' ``` ########## sdks/python/apache_beam/yaml/integration_tests.py: ########## @@ -37,14 +37,33 @@ from apache_beam.utils import python_callable from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_transform - +from apache_beam.io.gcp.spanner_wrapper import SpannerWrapper @contextlib.contextmanager def gcs_temp_dir(bucket): gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4()) yield gcs_tempdir filesystems.FileSystems.delete([gcs_tempdir]) +import contextlib +import uuid +import logging +from google.cloud import spanner + [email protected] +def temp_spanner_table(project, prefix = 'temp_spanner_db_'): + spanner_client = SpannerWrapper(project) + spanner_client._create_database() + instance = 'beam-test' + database = spanner_client.TEST_DATABASE + table = 'Users' Review Comment: Let's stay consistent with BQ and name this `tmp_table` ########## sdks/python/apache_beam/yaml/integration_tests.py: ########## @@ -37,14 +37,33 @@ from apache_beam.utils import python_callable from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_transform - +from apache_beam.io.gcp.spanner_wrapper import SpannerWrapper @contextlib.contextmanager def gcs_temp_dir(bucket): gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4()) yield gcs_tempdir filesystems.FileSystems.delete([gcs_tempdir]) +import contextlib +import uuid +import logging +from google.cloud import spanner + [email protected] +def temp_spanner_table(project, prefix = 'temp_spanner_db_'): + spanner_client = SpannerWrapper(project) + spanner_client._create_database() + instance = 'beam-test' Review Comment: This could lead to a collision if multiple PR's are running this test. Let's add a UUID to avoid naming collisions ```suggestion instance = '%s_%s' % (prefix, uuid.uuid4().hex) ``` (I don't think the prefix needs the trailing underscore in the method definition, but sure why BigQuery's context manager has it either) ########## sdks/python/apache_beam/io/gcp/spanner_wrapper.py: ########## @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import uuid +import apache_beam as beam + +try: + from google.cloud import spanner + +except ImportError: + spanner = None + +_LOGGER = logging.getLogger(__name__) + +class SpannerWrapper(object): + TEST_DATABASE = None + TEMP_DATABASE_PREFIX = 'temp-' + _SPANNER_CLIENT = None + _SPANNER_INSTANCE = None + + def __init__(self, project_id = "apache-beam-testing", temp_database_id = None): + self._SPANNER_CLIENT = spanner.Client(project = project_id) + self._SPANNER_INSTANCE = self._SPANNER_CLIENT.instance("beam-test") + self.TEST_DATABASE = None + + if temp_database_id and temp_database_id.startswith(self.TEMP_DATABASE_PREFIX): + raise ValueError( + 'User provided temp database ID cannot start with %r' % + self.TEMP_DATABASE_PREFIX) + + if temp_database_id is not None: + self.TEST_DATABASE = temp_database_id + else: + self.TEST_DATABASE = self._get_temp_database() + + def _get_temp_database(self): + uniq_id = uuid.uuid4().hex[:10] + return f'{self.TEMP_DATABASE_PREFIX}{uniq_id}' + + def _create_database(self): Review Comment: ```suggestion @retry.with_exponential_backoff( num_retries=MAX_RETRIES, retry_filter=retry.retry_on_server_errors_and_timeout_filter) def _create_database(self): ``` ########## sdks/python/apache_beam/io/gcp/spanner_wrapper.py: ########## @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import uuid +import apache_beam as beam + +try: + from google.cloud import spanner + +except ImportError: + spanner = None + +_LOGGER = logging.getLogger(__name__) + +class SpannerWrapper(object): + TEST_DATABASE = None + TEMP_DATABASE_PREFIX = 'temp-' + _SPANNER_CLIENT = None + _SPANNER_INSTANCE = None + + def __init__(self, project_id = "apache-beam-testing", temp_database_id = None): + self._SPANNER_CLIENT = spanner.Client(project = project_id) + self._SPANNER_INSTANCE = self._SPANNER_CLIENT.instance("beam-test") + self.TEST_DATABASE = None + + if temp_database_id and temp_database_id.startswith(self.TEMP_DATABASE_PREFIX): + raise ValueError( + 'User provided temp database ID cannot start with %r' % + self.TEMP_DATABASE_PREFIX) + + if temp_database_id is not None: + self.TEST_DATABASE = temp_database_id + else: + self.TEST_DATABASE = self._get_temp_database() + + def _get_temp_database(self): + uniq_id = uuid.uuid4().hex[:10] + return f'{self.TEMP_DATABASE_PREFIX}{uniq_id}' + + def _create_database(self): + _LOGGER.info('Creating test database: %s' % self.TEST_DATABASE) + instance = self._SPANNER_INSTANCE + database = instance.database( + self.TEST_DATABASE, + ddl_statements = [ + '''CREATE TABLE Users ( + UserId STRING(256) NOT NULL, + Key STRING(1024) + ) PRIMARY KEY (UserId)''' + ]) + operation = database.create() + _LOGGER.info('Creating database: Done! %s' % str(operation.result())) + + @classmethod Review Comment: This isn't truly acting as a class method as it is using state of object (class methods are useful for methods that are used independently from the state of the class, for example a method that returns every created table across all instances of the object) ```suggestion ``` and for future reference, a classmethod follows the standard ``` @classmethod def some_method(cls): return cls.some_var ``` using `cls` instead of `self` to access class variables -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
