[ https://issues.apache.org/jira/browse/BEAM-10139?focusedWorklogId=476891&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-476891 ]
ASF GitHub Bot logged work on BEAM-10139: ----------------------------------------- Author: ASF GitHub Bot Created on: 31/Aug/20 22:50 Start Date: 31/Aug/20 22:50 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on a change in pull request #12611: URL: https://github.com/apache/beam/pull/12611#discussion_r477614300 ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java ########## @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static java.util.stream.Collectors.toList; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.ByteArray; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.Type; +import java.math.BigDecimal; +import java.util.List; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.joda.time.Instant; + +final class StructUtils { + public static Row translateStructToRow(Struct struct, Schema schema) { + checkForSchemasEquality(schema.getFields(), struct.getType().getStructFields(), false); + + List<Schema.Field> fields = schema.getFields(); + Row.FieldValueBuilder valueBuilder = null; + // TODO: Remove this null-checking once nullable fields are supported in cross-language Review comment: What is the issue here? Nullable fields should be supported in cross-language ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ########## @@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions partitionOptions) { .withTransaction(getTransaction()); return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll); } + + SerializableFunction<Struct, Row> getFormatFn() { + return (SerializableFunction<Struct, Row>) + input -> + Row.withSchema(Schema.builder().addInt64Field("Key").build()) + .withFieldValue("Key", 3L) + .build(); + } + } + + public static class ReadRows extends PTransform<PBegin, PCollection<Row>> { + Read read; + Schema schema; + + public ReadRows(Read read, Schema schema) { + super("Read rows"); + this.read = read; + this.schema = schema; + } + + @Override + public PCollection<Row> expand(PBegin input) { + return input + .apply(read) + .apply( + MapElements.into(TypeDescriptor.of(Row.class)) + .via( + new SerializableFunction<Struct, Row>() { + @Override + public Row apply(Struct struct) { + return StructUtils.translateStructToRow(struct, schema); + } + })) + .setRowSchema(schema) + .setCoder(RowCoder.of(schema)); Review comment: ```suggestion .setRowSchema(schema); ``` `setCoder(RowCoder.of(schema))` is what `setRowSchema` does ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java ########## @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static java.util.stream.Collectors.toList; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.ByteArray; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.Type; +import java.math.BigDecimal; +import java.util.List; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.joda.time.Instant; + +final class StructUtils { + public static Row translateStructToRow(Struct struct, Schema schema) { + checkForSchemasEquality(schema.getFields(), struct.getType().getStructFields(), false); Review comment: Another reason to get the schema eagerly at pipeline construction time, this is an expensive operation to be doing for every Struct that we read. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ########## @@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions partitionOptions) { .withTransaction(getTransaction()); return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll); } + + SerializableFunction<Struct, Row> getFormatFn() { + return (SerializableFunction<Struct, Row>) + input -> + Row.withSchema(Schema.builder().addInt64Field("Key").build()) + .withFieldValue("Key", 3L) + .build(); + } + } + + public static class ReadRows extends PTransform<PBegin, PCollection<Row>> { + Read read; + Schema schema; + + public ReadRows(Read read, Schema schema) { + super("Read rows"); + this.read = read; + this.schema = schema; Review comment: We could also punt on this question and file a jira with a TODO here. I recognize this is a little out of scope for BEAM-10139, BEAM-10140. ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ########## @@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions partitionOptions) { .withTransaction(getTransaction()); return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll); } + + SerializableFunction<Struct, Row> getFormatFn() { + return (SerializableFunction<Struct, Row>) + input -> + Row.withSchema(Schema.builder().addInt64Field("Key").build()) + .withFieldValue("Key", 3L) + .build(); + } + } + + public static class ReadRows extends PTransform<PBegin, PCollection<Row>> { + Read read; + Schema schema; + + public ReadRows(Read read, Schema schema) { + super("Read rows"); + this.read = read; + this.schema = schema; Review comment: It would be really great if `SpannerIO.ReadRows` could determine the schema at pipeline construction time so the user doesn't have to specify it. In `SpannerIO.Read#expand` we require the user to specify either a query or a list of columns: https://github.com/apache/beam/blob/2872e37d801b489ecbb2c0d6a2a70430d8ba91e9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java#L656-L671 In both case we're very close to a schema. We just need to analyze the query and/or get the output types for the projected columns. I looked into it a little bit, but I'm not quite sure the best way to use the spanner client to look up the schema. The only thing I could figure out was to start a read and look at the type of `ResultSet#getCurrentRowAsStruct` which seems less than ideal. CC @nielm who's done some work with SpannerIO recently - do you have any suggestions for a way to determine the types of the Structs that SpannerIO.Read will produce? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java ########## @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static java.util.stream.Collectors.toList; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.ByteArray; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.Type; +import java.math.BigDecimal; +import java.util.List; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.joda.time.Instant; + +final class StructUtils { Review comment: woo this is a hefty class for type conversions! It does seem like there's a lot of duplicated logic, what's preventing us from combining more of it? ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java ########## @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static java.util.stream.Collectors.toList; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.ByteArray; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.Type; +import java.math.BigDecimal; +import java.util.List; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; +import org.joda.time.Instant; + +final class StructUtils { + public static Row translateStructToRow(Struct struct, Schema schema) { + checkForSchemasEquality(schema.getFields(), struct.getType().getStructFields(), false); + + List<Schema.Field> fields = schema.getFields(); + Row.FieldValueBuilder valueBuilder = null; + // TODO: Remove this null-checking once nullable fields are supported in cross-language + int count = 0; + while (valueBuilder == null && count < fields.size()) { + valueBuilder = getFirstStructValue(struct, fields.get(count), schema); + ++count; + } + for (int i = count; i < fields.size(); ++i) { + valueBuilder = getStructValue(valueBuilder, struct, fields.get(i)); + } + return valueBuilder != null ? valueBuilder.build() : Row.withSchema(schema).build(); + } + + public static Struct translateRowToStruct(Row row) { + Struct.Builder structBuilder = Struct.newBuilder(); + List<Schema.Field> fields = row.getSchema().getFields(); + fields.forEach( + field -> { + String column = field.getName(); + switch (field.getType().getTypeName()) { + case ROW: + structBuilder + .set(column) + .to( + beamTypeToSpannerType(field.getType()), + translateRowToStruct(row.getRow(column))); + break; + case ARRAY: + addArrayToStruct(structBuilder, row, field); + break; + case ITERABLE: + addIterableToStruct(structBuilder, row, field); + break; + case FLOAT: + structBuilder.set(column).to(row.getFloat(column).doubleValue()); + break; + case DOUBLE: + structBuilder.set(column).to(row.getDouble(column)); + break; + case DECIMAL: + structBuilder.set(column).to(row.getDecimal(column).doubleValue()); Review comment: This is lossy isn't it? I think we should just refuse to convert DECIMAL since Spanner doesn't have a corresponding type: https://cloud.google.com/spanner/docs/data-types#allowable_types ########## File path: sdks/java/io/google-cloud-platform/expansion-service/build.gradle ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature( + enableChecker: true, + automaticModuleName: 'org.apache.beam.sdk.io.gcp.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +task runService(type: Exec) { + dependsOn shadowJar + executable 'sh' + args '-c', 'java -jar /Users/piotr/beam/sdks/java/io/google-cloud-platform/expansion-service/build/libs/beam-sdks-java-io-google-cloud-platform-expansion-service-2.24.0-SNAPSHOT.jar 8097' +} Review comment: looks like this was just there for testing? ########## File path: sdks/python/apache_beam/io/gcp/spanner.py ########## @@ -0,0 +1,504 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PTransforms for supporting Spanner in Python pipelines. + + These transforms are currently supported by Beam portable + Flink and Spark runners. + + **Setup** + + Transforms provided in this module are cross-language transforms + implemented in the Beam Java SDK. During the pipeline construction, Python SDK + will connect to a Java expansion service to expand these transforms. + To facilitate this, a small amount of setup is needed before using these + transforms in a Beam Python pipeline. + + There are several ways to setup cross-language Spanner transforms. + + * Option 1: use the default expansion service + * Option 2: specify a custom expansion service + + See below for details regarding each of these options. + + *Option 1: Use the default expansion service* + + This is the recommended and easiest setup option for using Python Spanner + transforms. This option is only available for Beam 2.25.0 and later. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Install Java runtime in the computer from where the pipeline is constructed + and make sure that 'java' command is available. + + In this option, Python SDK will either download (for released Beam version) or + build (when running from a Beam Git clone) a expansion service jar and use + that to expand transforms. Currently Spanner transforms use the + 'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this + purpose. + + *Option 2: specify a custom expansion service* + + In this option, you startup your own expansion service and provide that as + a parameter when using the transforms provided in this module. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Startup your own expansion service. + * Update your pipeline to provide the expansion service address when + initiating Spanner transforms provided in this module. + + Flink Users can use the built-in Expansion Service of the Flink Runner's + Job Server. If you start Flink's Job Server, the expansion service will be + started on port 8097. For a different address, please set the + expansion_service parameter. + + **More information** + + For more information regarding cross-language transforms see: + - https://beam.apache.org/roadmap/portability/ + + For more information specific to Flink runner see: + - https://beam.apache.org/documentation/runners/flink/ +""" + +# pytype: skip-file + +from __future__ import absolute_import + +import typing +import uuid +from typing import List +from typing import NamedTuple +from typing import Optional + +from past.builtins import unicode + +from apache_beam import coders +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder +from apache_beam.typehints.schemas import named_tuple_to_schema + +__all__ = [ + 'WriteToSpanner', + 'ReadFromSpanner', + 'MutationCreator', + 'TimestampBoundMode', + 'TimeUnit', +] + + +def default_io_expansion_service(): + return BeamJarExpansionService( + 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar') + + +WriteToSpannerSchema = typing.NamedTuple( + 'WriteToSpannerSchema', + [ + ('instance_id', unicode), + ('database_id', unicode), + ('project_id', Optional[unicode]), + ('batch_size_bytes', Optional[int]), + ('max_num_mutations', Optional[int]), + ('max_num_rows', Optional[int]), + ('grouping_factor', Optional[int]), + ('host', Optional[unicode]), + ('emulator_host', Optional[unicode]), + ('commit_deadline', Optional[int]), + ('max_cumulative_backoff', Optional[int]), + ], +) + + +class WriteToSpanner(ExternalTransform): Review comment: It looks like there's already a native SpannerIO in the Python SDK in [apache_beam/io/gcp/experimental/spannerio.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/experimental/spannerio.py). Are we planning on removing that one? Should the API for this one be compliant with that one? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 476891) Time Spent: 1.5h (was: 1h 20m) > Add cross-language wrapper for Java's SpannerIO Write > ----------------------------------------------------- > > Key: BEAM-10139 > URL: https://issues.apache.org/jira/browse/BEAM-10139 > Project: Beam > Issue Type: Sub-task > Components: cross-language, io-java-gcp, io-py-gcp > Affects Versions: Not applicable > Reporter: Piotr Szuberski > Assignee: Piotr Szuberski > Priority: P2 > Labels: portability > Fix For: Not applicable > > Time Spent: 1.5h > Remaining Estimate: 0h > > Add cross-language wrapper for Java's SpannerIO Write -- This message was sent by Atlassian Jira (v8.3.4#803005)