[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-11-13 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r523132998



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableDateTime;
+
+final class StructUtils {
+  public static Row structToBeamRow(Struct struct, Schema schema) {
+Map structValues =
+schema.getFields().stream()
+.collect(
+HashMap::new,
+(map, field) -> {
+  @Nullable Object structValue = getStructValue(struct, field);
+  if (structValue == null) {
+throw new NullPointerException("Null struct value at field 
" + field.getName());
+  }
+  map.put(field.getName(), structValue);
+},
+Map::putAll);
+return Row.withSchema(schema).withFieldValues(structValues).build();
+  }
+
+  public static Struct beamRowToStruct(Row row) {
+Struct.Builder structBuilder = Struct.newBuilder();
+List fields = row.getSchema().getFields();
+fields.forEach(
+field -> {
+  String column = field.getName();
+  switch (field.getType().getTypeName()) {
+case ROW:
+  @Nullable Row subRow = row.getRow(column);
+  if (subRow == null) {
+throw new NullPointerException(String.format("Null subRow at 
'%s' column", column));
+  }

Review comment:
   Guava's checkNotNull doesn't work - checker doesn't consider fields 
checked this way as not nulls. It also isn't expected to throw anything so I 
get missing return statements in the switch statements. So for now I'd leave it 
throwing NPE. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-11-13 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r523024788



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})

Review comment:
   Done. Oh, it was quite painful as all of the row getters return a 
@Nullable value. Especially that checkNotNull doesn't work with the checker and 
there is even no possibility to check for null in a function (only `if (var == 
null) { throw new NullPointerException("Null var"); }` seem to work.
   
   It doesn't even work in chained functions as in this example:
   ```
   @Nullable Object var = new Object();
   if (var != null) {
 someObject.doSth().doChained(var); // checker doesn't understand that var 
is checked for nullness)
   }
   ```
   So it's quite unfriendly. In general I'm really excited about dealing with 
NPE problem, but for now it adds much more complexity and reduces the 
contributor friendliness. But I guess that it's worth it, especially when the 
checker gets smarter and will work with the Guava checks and chained functions 
(if it's even possible?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-11-13 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r523017744



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,635 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.26.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/

Review comment:
   I agree - it refers to all the existing xlang transforms, so it'll be 
done in another PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-11-12 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r522054029



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,662 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.26.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import uuid
+from enum import Enum
+from enum import auto
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import schema_from_element_type
+
+__all__ = [
+'ReadFromSpanner',
+'SpannerDelete',
+'SpannerInsert',
+'SpannerInsertOrUpdate',
+'SpannerReplace',
+'SpannerUpdate',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+_READ_URN = 'beam:external:java:spanner:read:v1'
+_WRITE_URN = 'beam:external:java:spanner:write:v1'
+
+
+class TimeUnit(Enum):
+  NANOSECONDS = auto()
+  MICROSECONDS = auto()
+  MILLISECONDS = auto()
+  SECONDS = auto()
+  HOURS = auto()
+  DAYS = auto()
+
+
+class TimestampBoundMode(Enum):
+  MAX_STALENESS = auto()
+  EXACT_STALENESS = auto()
+  READ_TIMESTAMP = auto()
+  MIN_READ_TIMESTA

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-11-12 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r522054029



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,662 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.26.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import uuid
+from enum import Enum
+from enum import auto
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import schema_from_element_type
+
+__all__ = [
+'ReadFromSpanner',
+'SpannerDelete',
+'SpannerInsert',
+'SpannerInsertOrUpdate',
+'SpannerReplace',
+'SpannerUpdate',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+_READ_URN = 'beam:external:java:spanner:read:v1'
+_WRITE_URN = 'beam:external:java:spanner:write:v1'
+
+
+class TimeUnit(Enum):
+  NANOSECONDS = auto()
+  MICROSECONDS = auto()
+  MILLISECONDS = auto()
+  SECONDS = auto()
+  HOURS = auto()
+  DAYS = auto()
+
+
+class TimestampBoundMode(Enum):
+  MAX_STALENESS = auto()
+  EXACT_STALENESS = auto()
+  READ_TIMESTAMP = auto()
+  MIN_READ_TIMESTA

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-11-12 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r522053018



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,662 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.26.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import uuid
+from enum import Enum
+from enum import auto
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+from apache_beam.typehints.schemas import schema_from_element_type
+
+__all__ = [
+'ReadFromSpanner',
+'SpannerDelete',
+'SpannerInsert',
+'SpannerInsertOrUpdate',
+'SpannerReplace',
+'SpannerUpdate',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+_READ_URN = 'beam:external:java:spanner:read:v1'
+_WRITE_URN = 'beam:external:java:spanner:write:v1'
+
+
+class TimeUnit(Enum):
+  NANOSECONDS = auto()
+  MICROSECONDS = auto()
+  MILLISECONDS = auto()
+  SECONDS = auto()
+  HOURS = auto()
+  DAYS = auto()
+
+
+class TimestampBoundMode(Enum):
+  MAX_STALENESS = auto()
+  EXACT_STALENESS = auto()
+  READ_TIMESTAMP = auto()
+  MIN_READ_TIMESTA

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-11-12 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r522052915



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.TimestampBound;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * Exposes {@link SpannerIO.WriteRows} and {@link SpannerIO.ReadRows} as an 
external transform for
+ * cross-language usage.
+ */
+@Experimental(Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
+  public static final String WRITE_URN = "beam:external:java:spanner:write:v1";
+  public static final String READ_URN = "beam:external:java:spanner:read:v1";
+
+  @Override
+  public Map> 
knownBuilderInstances() {
+return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_URN, new 
ReadBuilder());

Review comment:
   That makes sense. Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-15 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r505693635



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,483 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+'WriteToSpanner',
+'ReadFromSpanner',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+'WriteToSpannerSchema',
+[
+('instance_id', unicode),
+('database_id', unicode),
+('project_id', Optional[unicode]),
+('max_batch_size_bytes', Optional[int]),
+('max_number_mutations', Optional[int]),
+('max_number_rows', Optional[int]),
+('grouping_factor', Optional[int]),
+('host', Optional[unicode]),
+('emulator_host', Optional[unicode]),
+('commit_deadline', Optional[int]),
+('max_cumulative_backoff', Optional[int]),
+],
+)
+
+
+class WriteToSpanner:
+  """
+  A PTransform which writes mutations to the s

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-15 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r505693055



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
##
@@ -34,4 +49,237 @@ public static boolean isPointDelete(Mutation m) {
 && Iterables.isEmpty(m.getKeySet().getRanges())
 && Iterables.size(m.getKeySet().getKeys()) == 1;
   }
+
+  /**
+   * Utility function to convert row to mutation.
+   *
+   * @return function that can convert row to mutation
+   */
+  public static SerializableFunction beamRowToMutationFn() {

Review comment:
   Done. I hope it's not too brief.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-15 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r505658098



##
File path: sdks/java/io/google-cloud-platform/expansion-service/build.gradle
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
+
+applyJavaNature(
+enableChecker: true,
+automaticModuleName: 'org.apache.beam.sdk.io.gcp.expansion.service',
+exportJavadoc: false,
+validateShadowJar: false,
+shadowClosure: {},
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform :: 
Expansion Service"
+ext.summary = "Expansion service serving Spanner Java IO"

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-15 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r505657264



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,483 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+'WriteToSpanner',
+'ReadFromSpanner',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+'WriteToSpannerSchema',
+[
+('instance_id', unicode),
+('database_id', unicode),
+('project_id', Optional[unicode]),
+('max_batch_size_bytes', Optional[int]),
+('max_number_mutations', Optional[int]),
+('max_number_rows', Optional[int]),
+('grouping_factor', Optional[int]),
+('host', Optional[unicode]),
+('emulator_host', Optional[unicode]),
+('commit_deadline', Optional[int]),
+('max_cumulative_backoff', Optional[int]),
+],
+)

Review comment:
   Great!




--

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-15 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r505657026



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,483 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+'WriteToSpanner',
+'ReadFromSpanner',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+'WriteToSpannerSchema',
+[
+('instance_id', unicode),
+('database_id', unicode),
+('project_id', Optional[unicode]),
+('max_batch_size_bytes', Optional[int]),
+('max_number_mutations', Optional[int]),
+('max_number_rows', Optional[int]),
+('grouping_factor', Optional[int]),
+('host', Optional[unicode]),
+('emulator_host', Optional[unicode]),
+('commit_deadline', Optional[int]),
+('max_cumulative_backoff', Optional[int]),
+],
+)
+
+
+class WriteToSpanner:
+  """
+  A PTransform which writes mutations to the s

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-15 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r505611080



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,483 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+'WriteToSpanner',
+'ReadFromSpanner',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+'WriteToSpannerSchema',
+[
+('instance_id', unicode),
+('database_id', unicode),
+('project_id', Optional[unicode]),
+('max_batch_size_bytes', Optional[int]),
+('max_number_mutations', Optional[int]),
+('max_number_rows', Optional[int]),
+('grouping_factor', Optional[int]),
+('host', Optional[unicode]),
+('emulator_host', Optional[unicode]),
+('commit_deadline', Optional[int]),
+('max_cumulative_backoff', Optional[int]),
+],
+)
+
+
+class WriteToSpanner:
+  """
+  A PTransform which writes mutations to the s

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-09 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r501676146



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions 
partitionOptions) {
   .withTransaction(getTransaction());
   return input.apply(Create.of(getReadOperation())).apply("Execute query", 
readAll);
 }
+
+SerializableFunction getFormatFn() {
+  return (SerializableFunction)
+  input ->
+  Row.withSchema(Schema.builder().addInt64Field("Key").build())
+  .withFieldValue("Key", 3L)
+  .build();
+}
+  }
+
+  public static class ReadRows extends PTransform> {
+Read read;
+Schema schema;
+
+public ReadRows(Read read, Schema schema) {
+  super("Read rows");
+  this.read = read;
+  this.schema = schema;

Review comment:
   Thank you @nielm ! I thought about the LIMIT approach but then I found 
the same arguments not to do that.
   
   It appears there exist a jdbc client for Spanner: 
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if 
I can use it. 
   
   There is ResultSetMetadata in Spanner's REST API which extends json object. 
https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but 
at the end of the day it requires at least partially to fetch the data.
   
   But I would leave it for another PR as it supposedly require to move 
SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can 
see Struct type is represented as String as is mentiones here:
   ```
   The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type, 
accessible through this driver as String types. All other types have 
appropriate mappings.
   ```
   So it may not be the best option.

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions 
partitionOptions) {
   .withTransaction(getTransaction());
   return input.apply(Create.of(getReadOperation())).apply("Execute query", 
readAll);
 }
+
+SerializableFunction getFormatFn() {
+  return (SerializableFunction)
+  input ->
+  Row.withSchema(Schema.builder().addInt64Field("Key").build())
+  .withFieldValue("Key", 3L)
+  .build();
+}
+  }
+
+  public static class ReadRows extends PTransform> {
+Read read;
+Schema schema;
+
+public ReadRows(Read read, Schema schema) {
+  super("Read rows");
+  this.read = read;
+  this.schema = schema;

Review comment:
   Thank you @nielm ! I thought about the LIMIT approach but then I found 
the same arguments not to do that.
   
   It appears there exist a jdbc client for Spanner: 
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if 
I can use it. 
   
   There is ResultSetMetadata in Spanner's REST API which extends json object. 
https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but 
at the end of the day it requires at least partially to fetch the data.
   
   But I would leave it for another PR as it supposedly require to move 
SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can 
see Struct type is represented as String as is mentiones here:
   ```
   The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type, 
accessible through 
   this driver as String types. All other types have appropriate mappings.
   ```
   So it may not be the best option.

##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions 
partitionOptions) {
   .withTransaction(getTransaction());
   return input.apply(Create.of(getReadOperation())).apply("Execute query", 
readAll);
 }
+
+SerializableFunction getFormatFn() {
+  return (SerializableFunction)
+  input ->
+  Row.withSchema(Schema.builder().addInt64Field("Key").build())
+  .withFieldValue("Key", 3L)
+  .build();
+}
+  }
+
+  public static class ReadRows extends PTransform> {
+Read read;
+Schema schema;
+
+public ReadRows(Read read, Schema schema) {
+  super("Read rows");
+  this.read = read;
+  this.schema = schema;

Review comment:
   Thank you @nielm ! I thought about the LIMIT approach but then I found 
the same arguments not to do that.
   
   It appears there exist a jdbc client for Spanner: 
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if 
I can use it. 
   
   There is ResultSetMetadata in Spanner's REST API which ext

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-08 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r501677086



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+  public static Row translateStructToRow(Struct struct, Schema schema) {
+checkForSchemasEquality(schema.getFields(), 
struct.getType().getStructFields(), false);
+
+List fields = schema.getFields();
+Row.FieldValueBuilder valueBuilder = null;
+// TODO: Remove this null-checking once nullable fields are supported in 
cross-language
+int count = 0;
+while (valueBuilder == null && count < fields.size()) {
+  valueBuilder = getFirstStructValue(struct, fields.get(count), schema);
+  ++count;
+}
+for (int i = count; i < fields.size(); ++i) {
+  valueBuilder = getStructValue(valueBuilder, struct, fields.get(i));
+}
+return valueBuilder != null ? valueBuilder.build() : 
Row.withSchema(schema).build();
+  }
+
+  public static Struct translateRowToStruct(Row row) {
+Struct.Builder structBuilder = Struct.newBuilder();
+List fields = row.getSchema().getFields();
+fields.forEach(
+field -> {
+  String column = field.getName();
+  switch (field.getType().getTypeName()) {
+case ROW:
+  structBuilder
+  .set(column)
+  .to(
+  beamTypeToSpannerType(field.getType()),
+  translateRowToStruct(row.getRow(column)));
+  break;
+case ARRAY:
+  addArrayToStruct(structBuilder, row, field);
+  break;
+case ITERABLE:
+  addIterableToStruct(structBuilder, row, field);
+  break;
+case FLOAT:
+  structBuilder.set(column).to(row.getFloat(column).doubleValue());
+  break;
+case DOUBLE:
+  structBuilder.set(column).to(row.getDouble(column));
+  break;
+case DECIMAL:
+  
structBuilder.set(column).to(row.getDecimal(column).doubleValue());

Review comment:
   Great, quite a new thing in Spanner as I can see! Thanks! Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-08 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r501677086



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+  public static Row translateStructToRow(Struct struct, Schema schema) {
+checkForSchemasEquality(schema.getFields(), 
struct.getType().getStructFields(), false);
+
+List fields = schema.getFields();
+Row.FieldValueBuilder valueBuilder = null;
+// TODO: Remove this null-checking once nullable fields are supported in 
cross-language
+int count = 0;
+while (valueBuilder == null && count < fields.size()) {
+  valueBuilder = getFirstStructValue(struct, fields.get(count), schema);
+  ++count;
+}
+for (int i = count; i < fields.size(); ++i) {
+  valueBuilder = getStructValue(valueBuilder, struct, fields.get(i));
+}
+return valueBuilder != null ? valueBuilder.build() : 
Row.withSchema(schema).build();
+  }
+
+  public static Struct translateRowToStruct(Row row) {
+Struct.Builder structBuilder = Struct.newBuilder();
+List fields = row.getSchema().getFields();
+fields.forEach(
+field -> {
+  String column = field.getName();
+  switch (field.getType().getTypeName()) {
+case ROW:
+  structBuilder
+  .set(column)
+  .to(
+  beamTypeToSpannerType(field.getType()),
+  translateRowToStruct(row.getRow(column)));
+  break;
+case ARRAY:
+  addArrayToStruct(structBuilder, row, field);
+  break;
+case ITERABLE:
+  addIterableToStruct(structBuilder, row, field);
+  break;
+case FLOAT:
+  structBuilder.set(column).to(row.getFloat(column).doubleValue());
+  break;
+case DOUBLE:
+  structBuilder.set(column).to(row.getDouble(column));
+  break;
+case DECIMAL:
+  
structBuilder.set(column).to(row.getDecimal(column).doubleValue());

Review comment:
   Great, quite a new thing in Spanner as I can see! Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-08 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r501677086



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+  public static Row translateStructToRow(Struct struct, Schema schema) {
+checkForSchemasEquality(schema.getFields(), 
struct.getType().getStructFields(), false);
+
+List fields = schema.getFields();
+Row.FieldValueBuilder valueBuilder = null;
+// TODO: Remove this null-checking once nullable fields are supported in 
cross-language
+int count = 0;
+while (valueBuilder == null && count < fields.size()) {
+  valueBuilder = getFirstStructValue(struct, fields.get(count), schema);
+  ++count;
+}
+for (int i = count; i < fields.size(); ++i) {
+  valueBuilder = getStructValue(valueBuilder, struct, fields.get(i));
+}
+return valueBuilder != null ? valueBuilder.build() : 
Row.withSchema(schema).build();
+  }
+
+  public static Struct translateRowToStruct(Row row) {
+Struct.Builder structBuilder = Struct.newBuilder();
+List fields = row.getSchema().getFields();
+fields.forEach(
+field -> {
+  String column = field.getName();
+  switch (field.getType().getTypeName()) {
+case ROW:
+  structBuilder
+  .set(column)
+  .to(
+  beamTypeToSpannerType(field.getType()),
+  translateRowToStruct(row.getRow(column)));
+  break;
+case ARRAY:
+  addArrayToStruct(structBuilder, row, field);
+  break;
+case ITERABLE:
+  addIterableToStruct(structBuilder, row, field);
+  break;
+case FLOAT:
+  structBuilder.set(column).to(row.getFloat(column).doubleValue());
+  break;
+case DOUBLE:
+  structBuilder.set(column).to(row.getDouble(column));
+  break;
+case DECIMAL:
+  
structBuilder.set(column).to(row.getDecimal(column).doubleValue());

Review comment:
   Great, I think it wasn't available when I wrote that code. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-08 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r501676146



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions 
partitionOptions) {
   .withTransaction(getTransaction());
   return input.apply(Create.of(getReadOperation())).apply("Execute query", 
readAll);
 }
+
+SerializableFunction getFormatFn() {
+  return (SerializableFunction)
+  input ->
+  Row.withSchema(Schema.builder().addInt64Field("Key").build())
+  .withFieldValue("Key", 3L)
+  .build();
+}
+  }
+
+  public static class ReadRows extends PTransform> {
+Read read;
+Schema schema;
+
+public ReadRows(Read read, Schema schema) {
+  super("Read rows");
+  this.read = read;
+  this.schema = schema;

Review comment:
   Thank you @nielm ! I thought about the LIMIT approach but then I found 
the same arguments not to do that.
   
   It appears there exist a jdbc client for Spanner: 
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if 
I can use it. 
   
   There is ResultSetMetadata in Spanner's REST API which extends json object. 
https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but 
at the end of the day it requires at least partially to fetch the data.
   
   But I would leave it for another PR as it supposedly require to move 
SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can 
see Struct type is mapped to String/Varchar as is mentioned in the FAQ, so it 
may not be the best option
   ```
   The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type, 
accessible through 
   this driver as String types. All other types have appropriate mappings.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-08 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r501676146



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions 
partitionOptions) {
   .withTransaction(getTransaction());
   return input.apply(Create.of(getReadOperation())).apply("Execute query", 
readAll);
 }
+
+SerializableFunction getFormatFn() {
+  return (SerializableFunction)
+  input ->
+  Row.withSchema(Schema.builder().addInt64Field("Key").build())
+  .withFieldValue("Key", 3L)
+  .build();
+}
+  }
+
+  public static class ReadRows extends PTransform> {
+Read read;
+Schema schema;
+
+public ReadRows(Read read, Schema schema) {
+  super("Read rows");
+  this.read = read;
+  this.schema = schema;

Review comment:
   Thank you @nielm ! I thought about the LIMIT approach but then I found 
the same arguments not to do that.
   
   It appears there exist a jdbc client for Spanner: 
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if 
I can use it. 
   
   There is ResultSetMetadata in Spanner's REST API which extends json object. 
https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but 
at the end of the day it requires at least partially to fetch the data.
   
   But I would leave it for another PR as it supposedly require to move 
SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can 
see Struct type is represented as String as is mentiones here:
   ```
   The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type, 
accessible through 
   this driver as String types. All other types have appropriate mappings.
   ```
   So it may not be the best option.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-10-08 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r501676146



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions 
partitionOptions) {
   .withTransaction(getTransaction());
   return input.apply(Create.of(getReadOperation())).apply("Execute query", 
readAll);
 }
+
+SerializableFunction getFormatFn() {
+  return (SerializableFunction)
+  input ->
+  Row.withSchema(Schema.builder().addInt64Field("Key").build())
+  .withFieldValue("Key", 3L)
+  .build();
+}
+  }
+
+  public static class ReadRows extends PTransform> {
+Read read;
+Schema schema;
+
+public ReadRows(Read read, Schema schema) {
+  super("Read rows");
+  this.read = read;
+  this.schema = schema;

Review comment:
   Thank you @nielm ! I thought about the LIMIT approach but then I found 
the same arguments not to do that.
   
   It appears there exist a jdbc client for Spanner: 
https://cloud.google.com/spanner/docs/jdbc-drivers . I'll try to figure out if 
I can use it. 
   
   There is ResultSetMetadata in Spanner's REST API which extends json object. 
https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSetMetadata but 
at the end of the day it requires at least partially to fetch the data.
   
   But I would leave it for another PR as it supposedly require to move 
SchemaUtils from io/jdbc to some more general place (extensions/sql?). As I can 
see Struct type is represented as String as is mentiones here:
   ```
   The Cloud Spanner STRUCT data type is mapped to a SQL VARCHAR data type, 
accessible through this driver as String types. All other types have 
appropriate mappings.
   ```
   So it may not be the best option.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-14 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r488422134



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+  public static Row translateStructToRow(Struct struct, Schema schema) {
+checkForSchemasEquality(schema.getFields(), 
struct.getType().getStructFields(), false);
+
+List fields = schema.getFields();
+Row.FieldValueBuilder valueBuilder = null;
+// TODO: Remove this null-checking once nullable fields are supported in 
cross-language

Review comment:
   I'm not sure where my message has gone, but I wrote that nulls come up 
with no problems, I've just used ImmutableMap which does not allow null values. 
Replacing it with java.util.HashMap solved the issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-04 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r483525931



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,503 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+'WriteToSpanner',
+'ReadFromSpanner',
+'MutationCreator',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+'WriteToSpannerSchema',
+[
+('instance_id', unicode),
+('database_id', unicode),
+('project_id', Optional[unicode]),
+('max_batch_size_bytes', Optional[int]),
+('max_number_mutations', Optional[int]),
+('max_number_rows', Optional[int]),
+('grouping_factor', Optional[int]),
+('host', Optional[unicode]),
+('emulator_host', Optional[unicode]),
+('commit_deadline', Optional[int]),
+('max_cumulative_backoff', Optional[int]),
+],
+)
+
+
+class WriteToSpanner(ExternalTransform):
+  """
+  A PTransform which writes mutations to the specified instance's da

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-04 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r483574800



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.TimestampBound;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * Exposes {@link SpannerIO.WriteRows} and {@link SpannerIO.ReadRows} as an 
external transform for
+ * cross-language usage.
+ */
+@Experimental(Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
+  public static final String WRITE_URN = "beam:external:java:spanner:write:v1";
+  public static final String READ_URN = "beam:external:java:spanner:read:v1";
+
+  @Override
+  public Map> 
knownBuilderInstances() {
+return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_URN, new 
ReadBuilder());
+  }
+
+  public abstract static class CrossLanguageConfiguration {
+String instanceId;
+String databaseId;
+String projectId;
+@Nullable String host;
+@Nullable String emulatorHost;
+
+public void setInstanceId(String instanceId) {
+  this.instanceId = instanceId;
+}
+
+public void setDatabaseId(String databaseId) {
+  this.databaseId = databaseId;
+}
+
+public void setProjectId(String projectId) {
+  this.projectId = projectId;
+}
+
+public void setHost(@Nullable String host) {
+  this.host = host;
+}
+
+public void setEmulatorHost(@Nullable String emulatorHost) {
+  this.emulatorHost = emulatorHost;
+}
+  }
+
+  @Experimental(Kind.PORTABILITY)
+  public static class ReadBuilder
+  implements ExternalTransformBuilder> {
+
+public static class Configuration extends CrossLanguageConfiguration {
+  // TODO: BEAM-10851 Come up with something to determine schema without 
this explicit parameter
+  private Schema schema;
+  private @Nullable String sql;
+  private @Nullable String table;
+  private @Nullable Boolean batching;
+  private @Nullable String timestampBoundMode;
+  private @Nullable String readTimestamp;
+  private @Nullable String timeUnit;
+  private @Nullable Long exactStaleness;
+
+  public void setSql(@Nullable String sql) {
+this.sql = sql;
+  }
+
+  public void setTable(@Nullable String table) {
+this.table = table;
+  }
+
+  public void setBatching(@Nullable Boolean batching) {
+this.batching = batching;
+  }
+
+  public void setTimestampBoundMode(@Nullable String timestampBoundMode) {
+this.timestampBoundMode = timestampBoundMode;
+  }
+
+  public void setSchema(byte[] schema) throws 
InvalidProtocolBufferException {
+this.schema = 
SchemaTranslation.schemaFromProto(SchemaApi.Schema.parseFrom(schema));
+  }
+
+  public void setReadTimestamp(@Nullable String readTime

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-04 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r483525931



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,503 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+'WriteToSpanner',
+'ReadFromSpanner',
+'MutationCreator',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+'WriteToSpannerSchema',
+[
+('instance_id', unicode),
+('database_id', unicode),
+('project_id', Optional[unicode]),
+('max_batch_size_bytes', Optional[int]),
+('max_number_mutations', Optional[int]),
+('max_number_rows', Optional[int]),
+('grouping_factor', Optional[int]),
+('host', Optional[unicode]),
+('emulator_host', Optional[unicode]),
+('commit_deadline', Optional[int]),
+('max_cumulative_backoff', Optional[int]),
+],
+)
+
+
+class WriteToSpanner(ExternalTransform):
+  """
+  A PTransform which writes mutations to the specified instance's da

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-04 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r483525931



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,503 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+'WriteToSpanner',
+'ReadFromSpanner',
+'MutationCreator',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+'WriteToSpannerSchema',
+[
+('instance_id', unicode),
+('database_id', unicode),
+('project_id', Optional[unicode]),
+('max_batch_size_bytes', Optional[int]),
+('max_number_mutations', Optional[int]),
+('max_number_rows', Optional[int]),
+('grouping_factor', Optional[int]),
+('host', Optional[unicode]),
+('emulator_host', Optional[unicode]),
+('commit_deadline', Optional[int]),
+('max_cumulative_backoff', Optional[int]),
+],
+)
+
+
+class WriteToSpanner(ExternalTransform):
+  """
+  A PTransform which writes mutations to the specified instance's da

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-02 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r482081597



##
File path: sdks/python/apache_beam/io/gcp/spanner.py
##
@@ -0,0 +1,504 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+'WriteToSpanner',
+'ReadFromSpanner',
+'MutationCreator',
+'TimestampBoundMode',
+'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+  'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+'WriteToSpannerSchema',
+[
+('instance_id', unicode),
+('database_id', unicode),
+('project_id', Optional[unicode]),
+('batch_size_bytes', Optional[int]),
+('max_num_mutations', Optional[int]),
+('max_num_rows', Optional[int]),
+('grouping_factor', Optional[int]),
+('host', Optional[unicode]),
+('emulator_host', Optional[unicode]),
+('commit_deadline', Optional[int]),
+('max_cumulative_backoff', Optional[int]),
+],
+)
+
+
+class WriteToSpanner(ExternalTransform):

Review comment:
   I can try to make the API compliant with the native one. I thi

[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-02 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r482080324



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+  public static Row translateStructToRow(Struct struct, Schema schema) {
+checkForSchemasEquality(schema.getFields(), 
struct.getType().getStructFields(), false);

Review comment:
   Maybe we could just skip this check and let it crash when the types 
don't match?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-02 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r482079860



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {

Review comment:
   There are non-related different classes that require the same things to 
be done on them.
   One of them are Key and Mutation, other one Row.Builder and 
Row.FieldValueBuilder. In python there is duck typing and it's easy. But in 
Java I don't know how to reduce the repeated code. Maybe I should do more 
setValue(Object obj) and depend on castings instead of returning the proper 
type all the time. I'll try it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-02 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r482075489



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+  public static Row translateStructToRow(Struct struct, Schema schema) {
+checkForSchemasEquality(schema.getFields(), 
struct.getType().getStructFields(), false);
+
+List fields = schema.getFields();
+Row.FieldValueBuilder valueBuilder = null;
+// TODO: Remove this null-checking once nullable fields are supported in 
cross-language
+int count = 0;
+while (valueBuilder == null && count < fields.size()) {
+  valueBuilder = getFirstStructValue(struct, fields.get(count), schema);
+  ++count;
+}
+for (int i = count; i < fields.size(); ++i) {
+  valueBuilder = getStructValue(valueBuilder, struct, fields.get(i));
+}
+return valueBuilder != null ? valueBuilder.build() : 
Row.withSchema(schema).build();
+  }
+
+  public static Struct translateRowToStruct(Row row) {
+Struct.Builder structBuilder = Struct.newBuilder();
+List fields = row.getSchema().getFields();
+fields.forEach(
+field -> {
+  String column = field.getName();
+  switch (field.getType().getTypeName()) {
+case ROW:
+  structBuilder
+  .set(column)
+  .to(
+  beamTypeToSpannerType(field.getType()),
+  translateRowToStruct(row.getRow(column)));
+  break;
+case ARRAY:
+  addArrayToStruct(structBuilder, row, field);
+  break;
+case ITERABLE:
+  addIterableToStruct(structBuilder, row, field);
+  break;
+case FLOAT:
+  structBuilder.set(column).to(row.getFloat(column).doubleValue());
+  break;
+case DOUBLE:
+  structBuilder.set(column).to(row.getDouble(column));
+  break;
+case DECIMAL:
+  
structBuilder.set(column).to(row.getDecimal(column).doubleValue());

Review comment:
   I agree, at first I didn't include decimals but it definitely is lossy.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-02 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r482074113



##
File path: sdks/java/io/google-cloud-platform/expansion-service/build.gradle
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
+
+applyJavaNature(
+enableChecker: true,
+automaticModuleName: 'org.apache.beam.sdk.io.gcp.expansion.service',
+exportJavadoc: false,
+validateShadowJar: false,
+shadowClosure: {},
+)
+
+task runService(type: Exec) {
+dependsOn shadowJar
+executable 'sh'
+args '-c', 'java -jar 
/Users/piotr/beam/sdks/java/io/google-cloud-platform/expansion-service/build/libs/beam-sdks-java-io-google-cloud-platform-expansion-service-2.24.0-SNAPSHOT.jar
 8097'
+}

Review comment:
   My fault, I'll remove this. Sorry for that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-02 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r482073899



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions 
partitionOptions) {
   .withTransaction(getTransaction());
   return input.apply(Create.of(getReadOperation())).apply("Execute query", 
readAll);
 }
+
+SerializableFunction getFormatFn() {
+  return (SerializableFunction)
+  input ->
+  Row.withSchema(Schema.builder().addInt64Field("Key").build())
+  .withFieldValue("Key", 3L)
+  .build();
+}
+  }
+
+  public static class ReadRows extends PTransform> {
+Read read;
+Schema schema;
+
+public ReadRows(Read read, Schema schema) {
+  super("Read rows");
+  this.read = read;
+  this.schema = schema;

Review comment:
   I'd really like to do it in this PR, but the only thing that comes to 
mind is to do what you said - perform the read request with client and then 
read the schema. The obvious disadvantage is that the Spanner query will be 
executed twice. I researched that limit of 1 row added to the end of query will 
not improve the performance so this is not the thing to do for huge result sets





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-02 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r482070325



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##
@@ -678,6 +703,42 @@ public Read withPartitionOptions(PartitionOptions 
partitionOptions) {
   .withTransaction(getTransaction());
   return input.apply(Create.of(getReadOperation())).apply("Execute query", 
readAll);
 }
+
+SerializableFunction getFormatFn() {
+  return (SerializableFunction)
+  input ->
+  Row.withSchema(Schema.builder().addInt64Field("Key").build())
+  .withFieldValue("Key", 3L)
+  .build();
+}
+  }
+
+  public static class ReadRows extends PTransform> {
+Read read;
+Schema schema;
+
+public ReadRows(Read read, Schema schema) {
+  super("Read rows");
+  this.read = read;
+  this.schema = schema;
+}
+
+@Override
+public PCollection expand(PBegin input) {
+  return input
+  .apply(read)
+  .apply(
+  MapElements.into(TypeDescriptor.of(Row.class))
+  .via(
+  new SerializableFunction() {
+@Override
+public Row apply(Struct struct) {
+  return StructUtils.translateStructToRow(struct, 
schema);
+}
+  }))
+  .setRowSchema(schema)
+  .setCoder(RowCoder.of(schema));

Review comment:
   For some reason it was not obvious to me. Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on a change in pull request #12611: [BEAM-10139][BEAM-10140] Add cross-language support for Java SpannerIO with python wrapper

2020-09-02 Thread GitBox


piotr-szuberski commented on a change in pull request #12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r482063484



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java
##
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.ByteArray;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+final class StructUtils {
+  public static Row translateStructToRow(Struct struct, Schema schema) {
+checkForSchemasEquality(schema.getFields(), 
struct.getType().getStructFields(), false);
+
+List fields = schema.getFields();
+Row.FieldValueBuilder valueBuilder = null;
+// TODO: Remove this null-checking once nullable fields are supported in 
cross-language

Review comment:
   NullableCoder is not a standard coder as was mentioned here: 
https://issues.apache.org/jira/browse/BEAM-10529?jql=project%20%3D%20BEAM%20AND%20text%20~%20%22nullable%20python%22
   So I suppose the only way to support null values is not to set them.
   I noticed that when I tried to read a null field from Spanner table. But I 
may be wrong





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org