piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r501676146
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##########
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions
partitionOptions) {
.withTransaction(getTransaction());
return input.apply(Create.of(getReadOperation())).apply("Execute query",
readAll);
}
+
+ SerializableFunction<Struct, Row> getFormatFn() {
+ return (SerializableFunction<Struct, Row>)
+ input ->
+ Row.withSchema(Schema.builder().addInt64Field("Key").build())
+ .withFieldValue("Key", 3L)
+ .build();
+ }
+ }
+
+ public static class ReadRows extends PTransform<PBegin, PCollection<Row>> {
+ Read read;
+ Schema schema;
+
+ public ReadRows(Read read, Schema schema) {
+ super("Read rows");
+ this.read = read;
+ this.schema = schema;
Review comment:
Thank you @nielm ! I thought about the LIMIT approach but then I found
the same arguments not to do that.
It appears there exist a jdbc client for Spanner:
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if
I can use it.
There is ResultSetMetadata in Spanner's REST API which extends json object.
https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but
at the end of the day it requires at least partially to fetch the data.
But I would leave it for another PR as it supposedly require to move
SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can
see Struct type is represented as String as is mentiones here:
```
The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type,
accessible through this driver as String types. All other types have
appropriate mappings.
```
So it may not be the best option.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##########
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions
partitionOptions) {
.withTransaction(getTransaction());
return input.apply(Create.of(getReadOperation())).apply("Execute query",
readAll);
}
+
+ SerializableFunction<Struct, Row> getFormatFn() {
+ return (SerializableFunction<Struct, Row>)
+ input ->
+ Row.withSchema(Schema.builder().addInt64Field("Key").build())
+ .withFieldValue("Key", 3L)
+ .build();
+ }
+ }
+
+ public static class ReadRows extends PTransform<PBegin, PCollection<Row>> {
+ Read read;
+ Schema schema;
+
+ public ReadRows(Read read, Schema schema) {
+ super("Read rows");
+ this.read = read;
+ this.schema = schema;
Review comment:
Thank you @nielm ! I thought about the LIMIT approach but then I found
the same arguments not to do that.
It appears there exist a jdbc client for Spanner:
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if
I can use it.
There is ResultSetMetadata in Spanner's REST API which extends json object.
https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but
at the end of the day it requires at least partially to fetch the data.
But I would leave it for another PR as it supposedly require to move
SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can
see Struct type is represented as String as is mentiones here:
```
The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type,
accessible through
this driver as String types. All other types have appropriate mappings.
```
So it may not be the best option.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##########
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions
partitionOptions) {
.withTransaction(getTransaction());
return input.apply(Create.of(getReadOperation())).apply("Execute query",
readAll);
}
+
+ SerializableFunction<Struct, Row> getFormatFn() {
+ return (SerializableFunction<Struct, Row>)
+ input ->
+ Row.withSchema(Schema.builder().addInt64Field("Key").build())
+ .withFieldValue("Key", 3L)
+ .build();
+ }
+ }
+
+ public static class ReadRows extends PTransform<PBegin, PCollection<Row>> {
+ Read read;
+ Schema schema;
+
+ public ReadRows(Read read, Schema schema) {
+ super("Read rows");
+ this.read = read;
+ this.schema = schema;
Review comment:
Thank you @nielm ! I thought about the LIMIT approach but then I found
the same arguments not to do that.
It appears there exist a jdbc client for Spanner:
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if
I can use it.
There is ResultSetMetadata in Spanner's REST API which extends json object.
https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but
at the end of the day it requires at least partially to fetch the data.
But I would leave it for another PR as it supposedly require to move
SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can
see Struct type is mapped to String/Varchar as is mentioned in the FAQ, so it
may not be the best option
```
The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type,
accessible through
this driver as String types. All other types have appropriate mappings.
```
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+ public static Row translateStructToRow(Struct struct, Schema schema) {
+ checkForSchemasEquality(schema.getFields(),
struct.getType().getStructFields(), false);
+
+ List<Schema.Field> fields = schema.getFields();
+ Row.FieldValueBuilder valueBuilder = null;
+ // TODO: Remove this null-checking once nullable fields are supported in
cross-language
+ int count = 0;
+ while (valueBuilder == null && count < fields.size()) {
+ valueBuilder = getFirstStructValue(struct, fields.get(count), schema);
+ ++count;
+ }
+ for (int i = count; i < fields.size(); ++i) {
+ valueBuilder = getStructValue(valueBuilder, struct, fields.get(i));
+ }
+ return valueBuilder != null ? valueBuilder.build() :
Row.withSchema(schema).build();
+ }
+
+ public static Struct translateRowToStruct(Row row) {
+ Struct.Builder structBuilder = Struct.newBuilder();
+ List<Schema.Field> fields = row.getSchema().getFields();
+ fields.forEach(
+ field -> {
+ String column = field.getName();
+ switch (field.getType().getTypeName()) {
+ case ROW:
+ structBuilder
+ .set(column)
+ .to(
+ beamTypeToSpannerType(field.getType()),
+ translateRowToStruct(row.getRow(column)));
+ break;
+ case ARRAY:
+ addArrayToStruct(structBuilder, row, field);
+ break;
+ case ITERABLE:
+ addIterableToStruct(structBuilder, row, field);
+ break;
+ case FLOAT:
+ structBuilder.set(column).to(row.getFloat(column).doubleValue());
+ break;
+ case DOUBLE:
+ structBuilder.set(column).to(row.getDouble(column));
+ break;
+ case DECIMAL:
+
structBuilder.set(column).to(row.getDecimal(column).doubleValue());
Review comment:
Great, I think it wasn't available when I wrote that code. Thanks!
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+ public static Row translateStructToRow(Struct struct, Schema schema) {
+ checkForSchemasEquality(schema.getFields(),
struct.getType().getStructFields(), false);
+
+ List<Schema.Field> fields = schema.getFields();
+ Row.FieldValueBuilder valueBuilder = null;
+ // TODO: Remove this null-checking once nullable fields are supported in
cross-language
+ int count = 0;
+ while (valueBuilder == null && count < fields.size()) {
+ valueBuilder = getFirstStructValue(struct, fields.get(count), schema);
+ ++count;
+ }
+ for (int i = count; i < fields.size(); ++i) {
+ valueBuilder = getStructValue(valueBuilder, struct, fields.get(i));
+ }
+ return valueBuilder != null ? valueBuilder.build() :
Row.withSchema(schema).build();
+ }
+
+ public static Struct translateRowToStruct(Row row) {
+ Struct.Builder structBuilder = Struct.newBuilder();
+ List<Schema.Field> fields = row.getSchema().getFields();
+ fields.forEach(
+ field -> {
+ String column = field.getName();
+ switch (field.getType().getTypeName()) {
+ case ROW:
+ structBuilder
+ .set(column)
+ .to(
+ beamTypeToSpannerType(field.getType()),
+ translateRowToStruct(row.getRow(column)));
+ break;
+ case ARRAY:
+ addArrayToStruct(structBuilder, row, field);
+ break;
+ case ITERABLE:
+ addIterableToStruct(structBuilder, row, field);
+ break;
+ case FLOAT:
+ structBuilder.set(column).to(row.getFloat(column).doubleValue());
+ break;
+ case DOUBLE:
+ structBuilder.set(column).to(row.getDouble(column));
+ break;
+ case DECIMAL:
+
structBuilder.set(column).to(row.getDecimal(column).doubleValue());
Review comment:
Great, quite a new thing in Spanner as I can see! Thanks!
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+ public static Row translateStructToRow(Struct struct, Schema schema) {
+ checkForSchemasEquality(schema.getFields(),
struct.getType().getStructFields(), false);
+
+ List<Schema.Field> fields = schema.getFields();
+ Row.FieldValueBuilder valueBuilder = null;
+ // TODO: Remove this null-checking once nullable fields are supported in
cross-language
+ int count = 0;
+ while (valueBuilder == null && count < fields.size()) {
+ valueBuilder = getFirstStructValue(struct, fields.get(count), schema);
+ ++count;
+ }
+ for (int i = count; i < fields.size(); ++i) {
+ valueBuilder = getStructValue(valueBuilder, struct, fields.get(i));
+ }
+ return valueBuilder != null ? valueBuilder.build() :
Row.withSchema(schema).build();
+ }
+
+ public static Struct translateRowToStruct(Row row) {
+ Struct.Builder structBuilder = Struct.newBuilder();
+ List<Schema.Field> fields = row.getSchema().getFields();
+ fields.forEach(
+ field -> {
+ String column = field.getName();
+ switch (field.getType().getTypeName()) {
+ case ROW:
+ structBuilder
+ .set(column)
+ .to(
+ beamTypeToSpannerType(field.getType()),
+ translateRowToStruct(row.getRow(column)));
+ break;
+ case ARRAY:
+ addArrayToStruct(structBuilder, row, field);
+ break;
+ case ITERABLE:
+ addIterableToStruct(structBuilder, row, field);
+ break;
+ case FLOAT:
+ structBuilder.set(column).to(row.getFloat(column).doubleValue());
+ break;
+ case DOUBLE:
+ structBuilder.set(column).to(row.getDouble(column));
+ break;
+ case DECIMAL:
+
structBuilder.set(column).to(row.getDecimal(column).doubleValue());
Review comment:
Great, quite a new thing in Spanner as I can see! Thanks! Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]