[ 
https://issues.apache.org/jira/browse/BEAM-10139?focusedWorklogId=501118&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-501118
 ]

ASF GitHub Bot logged work on BEAM-10139:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Oct/20 14:56
            Start Date: 15/Oct/20 14:56
    Worklog Time Spent: 10m 
      Work Description: piotr-szuberski commented on a change in pull request 
#12611:
URL: https://github.com/apache/beam/pull/12611#discussion_r505611080



##########
File path: sdks/python/apache_beam/io/gcp/spanner.py
##########
@@ -0,0 +1,483 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""PTransforms for supporting Spanner in Python pipelines.
+
+  These transforms are currently supported by Beam portable
+  Flink and Spark runners.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python 
SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Spanner transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Spanner
+  transforms. This option is only available for Beam 2.25.0 and later.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+    and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) 
or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Spanner transforms use the
+  'beam-sdks-java-io-google-cloud-platform-expansion-service' jar for this
+  purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+    initiating Spanner transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+import uuid
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam import coders
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = [
+    'WriteToSpanner',
+    'ReadFromSpanner',
+    'TimestampBoundMode',
+    'TimeUnit',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:shadowJar')
+
+
+WriteToSpannerSchema = typing.NamedTuple(
+    'WriteToSpannerSchema',
+    [
+        ('instance_id', unicode),
+        ('database_id', unicode),
+        ('project_id', Optional[unicode]),
+        ('max_batch_size_bytes', Optional[int]),
+        ('max_number_mutations', Optional[int]),
+        ('max_number_rows', Optional[int]),
+        ('grouping_factor', Optional[int]),
+        ('host', Optional[unicode]),
+        ('emulator_host', Optional[unicode]),
+        ('commit_deadline', Optional[int]),
+        ('max_cumulative_backoff', Optional[int]),
+    ],
+)
+
+
+class WriteToSpanner:
+  """
+  A PTransform which writes mutations to the specified instance's database
+  via Spanner.
+
+  This transform receives rows defined as NamedTuple or as List[NamedTuple]
+  in case of delete operation. Example::
+
+    ExampleRow = typing.NamedTuple('ExampleRow',
+                                   [('id', int), ('name', unicode)])
+
+    with Pipeline() as p:
+      _ = (
+          p
+          | 'Impulse' >> beam.Impulse()
+          | 'Generate' >> beam.FlatMap(lambda x: range(num_rows))
+          | 'To row' >> beam.Map(lambda n: ExampleRow(n, str(n))
+              .with_output_types(ExampleRow)
+          | 'Write to Spanner' >> WriteToSpanner(
+              instance_id='your_instance',
+              database_id='existing_database',
+              project_id='your_project_id').insert('your_table'))
+
+  In addition you can pass List[ExampleRow] to delete transform::
+
+    with Pipeline() as p:
+      _ = (
+          p
+          | 'Impulse' >> beam.Impulse()
+          | 'Generate' >> beam.FlatMap(lambda x: range(num_rows))
+          | 'To row' >> beam.Map(lambda n: [ExampleRow(n, str(n),
+              ExampleRow(n * 2, str(n * 2)])
+              .with_output_types(List[ExampleRow])
+          | 'Write to Spanner' >> WriteToSpanner(
+              instance_id='your_instance',
+              database_id='existing_database',
+              project_id='your_project_id').delete('your_table'))
+
+  Experimental; no backwards compatibility guarantees.
+  """
+  def __init__(
+      self,
+      project_id,
+      instance_id,
+      database_id,
+      max_batch_size_bytes=None,
+      max_number_mutations=None,
+      max_number_rows=None,
+      grouping_factor=None,
+      host=None,
+      emulator_host=None,
+      commit_deadline=None,
+      max_cumulative_backoff=None,
+      expansion_service=None,
+  ):
+    """
+    Initializes a write operation to Spanner.
+
+    :param project_id: Specifies the Cloud Spanner project.
+    :param instance_id: Specifies the Cloud Spanner instance.
+    :param database_id: Specifies the Cloud Spanner database.
+    :param max_batch_size_bytes: Specifies the batch size limit (max number of
+        bytes mutated per batch). Default value is 1048576 bytes = 1MB.
+    :param max_number_mutations: Specifies the cell mutation limit (maximum
+        number of mutated cells per batch). Default value is 5000.
+    :param max_number_rows: Specifies the row mutation limit (maximum number of
+        mutated rows per batch). Default value is 500.
+    :param grouping_factor: Specifies the multiple of max mutation (in terms
+        of both bytes per batch and cells per batch) that is used to select a
+        set of mutations to sort by key for batching. This sort uses local
+        memory on the workers, so using large values can cause out of memory
+        errors. Default value is 1000.
+    :param host: Specifies the Cloud Spanner host.
+    :param emulator_host: Specifies Spanner emulator host.
+    :param commit_deadline: Specifies the deadline for the Commit API call.
+        Default is 15 secs. DEADLINE_EXCEEDED errors will prompt a 
backoff/retry
+        until the value of commit_deadline is reached. DEADLINE_EXCEEDED errors
+        are ar reported with logging and counters. Pass seconds as value.
+    :param max_cumulative_backoff: Specifies the maximum cumulative backoff
+        time when retrying after DEADLINE_EXCEEDED errors. Default is 900s
+        (15min). If the mutations still have not been written after this time,
+        they are treated as a failure, and handled according to the setting of
+        failure_mode. Pass seconds as value.
+    :param expansion_service: The address (host:port) of the ExpansionService.
+    """
+    max_cumulative_backoff = int(
+        max_cumulative_backoff) if max_cumulative_backoff else None
+    commit_deadline = int(commit_deadline) if commit_deadline else None
+    self.config = NamedTupleBasedPayloadBuilder(
+        WriteToSpannerSchema(
+            project_id=project_id,
+            instance_id=instance_id,
+            database_id=database_id,
+            max_batch_size_bytes=max_batch_size_bytes,
+            max_number_mutations=max_number_mutations,
+            max_number_rows=max_number_rows,
+            grouping_factor=grouping_factor,
+            host=host,
+            emulator_host=emulator_host,
+            commit_deadline=commit_deadline,
+            max_cumulative_backoff=max_cumulative_backoff,
+        ),
+    )
+    self.expansion_service = expansion_service or 
default_io_expansion_service()
+
+  def insert(self, table):
+    return WriteToSpannerTransform(
+        self.config, self.expansion_service, Operation.INSERT, table)
+
+  def delete(self, table):
+    return WriteToSpannerTransform(
+        self.config, self.expansion_service, Operation.DELETE, table)
+
+  def update(self, table):
+    return WriteToSpannerTransform(
+        self.config, self.expansion_service, Operation.UPDATE, table)
+
+  def replace(self, table):
+    return WriteToSpannerTransform(
+        self.config, self.expansion_service, Operation.REPLACE, table)
+
+  def insert_or_update(self, table):
+    return WriteToSpannerTransform(
+        self.config, self.expansion_service, Operation.INSERT_OR_UPDATE, table)
+
+
+ReadFromSpannerSchema = NamedTuple(
+    'ReadFromSpannerSchema',
+    [
+        ('instance_id', unicode),
+        ('database_id', unicode),
+        ('schema', bytes),
+        ('sql', Optional[unicode]),
+        ('table', Optional[unicode]),
+        ('project_id', Optional[unicode]),
+        ('host', Optional[unicode]),
+        ('emulator_host', Optional[unicode]),
+        ('batching', Optional[bool]),
+        ('timestamp_bound_mode', Optional[unicode]),
+        ('read_timestamp', Optional[unicode]),
+        ('exact_staleness', Optional[int]),
+        ('time_unit', Optional[unicode]),
+    ],
+)
+
+
+class ReadFromSpanner(ExternalTransform):
+  """
+  A PTransform which reads from the specified Spanner instance's database.
+
+  This transform required type of the row it has to return to provide the
+  schema. Example::
+
+    ExampleRow = typing.NamedTuple('ExampleRow',
+                                   [('id', int), ('name', unicode)])
+
+    with Pipeline() as p:
+      result = (
+          p
+          | ReadFromSpanner(
+              instance_id='your_instance_id',
+              database_id='your_database_id',
+              project_id='your_project_id',
+              row_type=ExampleRow,
+              query='SELECT * FROM some_table',
+          ).with_output_types(ExampleRow))
+
+  Experimental; no backwards compatibility guarantees.
+  """
+  URN = 'beam:external:java:spanner:read:v1'
+
+  def __init__(
+      self,
+      project_id,
+      instance_id,
+      database_id,
+      row_type=None,
+      sql=None,
+      table=None,
+      host=None,
+      emulator_host=None,
+      batching=None,
+      timestamp_bound_mode=None,
+      read_timestamp=None,
+      exact_staleness=None,
+      time_unit=None,
+      expansion_service=None,
+  ):
+    """
+    Initializes a read operation from Spanner.
+
+    :param project_id: Specifies the Cloud Spanner project.
+    :param instance_id: Specifies the Cloud Spanner instance.
+    :param database_id: Specifies the Cloud Spanner database.
+    :param row_type: Row type that fits the given query or table. Passed as
+        NamedTuple, e.g. NamedTuple('name', [('row_name', unicode)])
+    :param sql: An sql query to execute. It's results must fit the
+        provided row_type. Don't use when table is set.
+    :param table: A spanner table. When provided all columns from row_type
+        will be selected to query. Don't use when query is set.
+    :param batching: By default Batch API is used to read data from Cloud
+        Spanner. It is useful to disable batching when the underlying query
+        is not root-partitionable.
+    :param host: Specifies the Cloud Spanner host.
+    :param emulator_host: Specifies Spanner emulator host.
+    :param timestamp_bound_mode: Defines how Cloud Spanner will choose a
+        timestamp for a read-only transaction or a single read/query.
+        Possible values:
+        STRONG: A timestamp bound that will perform reads and queries at a
+        timestamp where all previously committed transactions are visible.
+        READ_TIMESTAMP: Returns a timestamp bound that will perform reads
+        and queries at the given timestamp.
+        MIN_READ_TIMESTAMP: Returns a timestamp bound that will perform reads
+        and queries at a timestamp chosen to be at least given timestamp value.
+        EXACT_STALENESS: Returns a timestamp bound that will perform reads and
+        queries at an exact staleness. The timestamp is chosen soon after the
+        read is started.
+        MAX_STALENESS: Returns a timestamp bound that will perform reads and
+        queries at a timestamp chosen to be at most time_unit stale.
+    :param read_timestamp: Timestamp in string. Use only when
+        timestamp_bound_mode is set to READ_TIMESTAMP or MIN_READ_TIMESTAMP.
+    :param exact_staleness: Staleness value as int. Use only when
+        timestamp_bound_mode is set to EXACT_STALENESS or MAX_STALENESS.
+        time_unit has to be set along with this param.
+    :param time_unit: Time unit for staleness_value. Possible values:
+        NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, HOURS, DAYS.
+    :param expansion_service: The address (host:port) of the ExpansionService.
+    """
+    assert row_type
+    assert sql or table and not (sql and table)
+    TimeUnit.verify_param(time_unit)
+    TimestampBoundMode.verify_param(timestamp_bound_mode)
+    staleness_value = int(exact_staleness) if exact_staleness else None
+
+    if staleness_value or time_unit:
+      assert staleness_value and time_unit and \
+             timestamp_bound_mode == TimestampBoundMode.MAX_STALENESS or \
+             timestamp_bound_mode == TimestampBoundMode.EXACT_STALENESS
+
+    if read_timestamp:
+      assert timestamp_bound_mode == TimestampBoundMode.MIN_READ_TIMESTAMP\
+             or timestamp_bound_mode == TimestampBoundMode.READ_TIMESTAMP
+
+    coders.registry.register_coder(row_type, coders.RowCoder)
+
+    super(ReadFromSpanner, self).__init__(
+        self.URN,
+        NamedTupleBasedPayloadBuilder(
+            ReadFromSpannerSchema(
+                instance_id=instance_id,
+                database_id=database_id,
+                sql=sql,
+                table=table,
+                schema=named_tuple_to_schema(row_type).SerializeToString(),
+                project_id=project_id,
+                host=host,
+                emulator_host=emulator_host,
+                batching=batching,
+                timestamp_bound_mode=timestamp_bound_mode,
+                read_timestamp=read_timestamp,
+                exact_staleness=exact_staleness,
+                time_unit=time_unit,
+            ),
+        ),
+        expansion_service or default_io_expansion_service(),
+    )
+
+
+class Operation:
+  INSERT = 'INSERT'
+  DELETE = 'DELETE'
+  UPDATE = 'UPDATE'
+  REPLACE = 'REPLACE'
+  INSERT_OR_UPDATE = 'INSERT_OR_UPDATE'
+
+
+class TimeUnit:
+  NANOSECONDS = 'NANOSECONDS'
+  MICROSECONDS = 'MICROSECONDS'
+  MILLISECONDS = 'MILLISECONDS'
+  SECONDS = 'SECONDS'
+  HOURS = 'HOURS'
+  DAYS = 'DAYS'
+
+  @staticmethod
+  def verify_param(param):
+    if param and not hasattr(TimeUnit, param):
+      raise RuntimeError(
+          'Invalid param for TimestampBoundMode: {}'.format(param))
+
+
+class TimestampBoundMode:
+  MAX_STALENESS = 'MAX_STALENESS'
+  EXACT_STALENESS = 'EXACT_STALENESS'
+  READ_TIMESTAMP = 'READ_TIMESTAMP'
+  MIN_READ_TIMESTAMP = 'MIN_READ_TIMESTAMP'
+  STRONG = 'STRONG'
+
+  @staticmethod
+  def verify_param(param):
+    if param and not hasattr(TimestampBoundMode, param):
+      raise RuntimeError(
+          'Invalid param for TimestampBoundMode: {}'.format(param))
+
+
+class WriteToSpannerTransform(PTransform):
+  URN = 'beam:external:java:spanner:write:v1'
+
+  def __init__(self, config, expansion_service, operation, table):
+    super(WriteToSpannerTransform, self).__init__()
+    self.config = config
+    self.expansion_service = expansion_service
+    self.operation = operation
+    self.table = table
+
+  def expand(self, row_pcoll):
+    return (
+        row_pcoll
+        | RowToMutation(self.operation, self.table)
+        | ExternalTransform(self.URN, self.config, self.expansion_service))
+
+
+class RowToMutation(PTransform):
+  def __init__(self, operation, table):
+    super(RowToMutation, self).__init__()
+    self.operation = operation
+    self.table = table
+
+  def expand(self, pcoll):
+    is_delete = self.operation == Operation.DELETE
+    mutation_name = 'Mutation_%s_%s' % (
+        self.operation, str(uuid.uuid4()).replace('-', ''))
+
+    # There is an error when pcoll.element_type is List[row_type] so pass
+    # a list of inner element types to NamedTuple explicitly.
+    is_list = hasattr(pcoll.element_type, 'inner_type')
+    row_type = pcoll.element_type.inner_type if is_list else pcoll.element_type

Review comment:
       The issue was that the type returned from pcoll.element_type 
List[SpannerTestKey] was not compatible with typing.List[SpannerTestKey] and 
caused a silly message:
   ```
   Exception: NamedTuple('Name', [(f0, t0), (f1, t1), ...]); each t must be a 
type Got List[SpannerTestKey].
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 501118)
    Time Spent: 20h  (was: 19h 50m)

> Add cross-language wrapper for Java's SpannerIO Write
> -----------------------------------------------------
>
>                 Key: BEAM-10139
>                 URL: https://issues.apache.org/jira/browse/BEAM-10139
>             Project: Beam
>          Issue Type: Sub-task
>          Components: cross-language, io-java-gcp, io-py-gcp
>    Affects Versions: Not applicable
>            Reporter: Piotr Szuberski
>            Assignee: Piotr Szuberski
>            Priority: P2
>              Labels: portability
>             Fix For: Not applicable
>
>          Time Spent: 20h
>  Remaining Estimate: 0h
>
> Add cross-language wrapper for Java's SpannerIO Write



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to