[ https://issues.apache.org/jira/browse/BEAM-7246?focusedWorklogId=382975&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382975 ]
ASF GitHub Bot logged work on BEAM-7246: ---------------------------------------- Author: ASF GitHub Bot Created on: 06/Feb/20 16:35 Start Date: 06/Feb/20 16:35 Worklog Time Spent: 10m Work Description: mszb commented on pull request #10712: [BEAM-7246] Added Google Spanner Write Transform URL: https://github.com/apache/beam/pull/10712#discussion_r375945920 ########## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ########## @@ -581,3 +644,369 @@ def display_data(self): label='transaction') return res + + +@experimental(extra_message="No backwards-compatibility guarantees.") +class WriteToSpanner(PTransform): + + def __init__(self, project_id, instance_id, database_id, pool=None, + credentials=None, max_batch_size_bytes=1048576): + """ + A PTransform to write onto Google Cloud Spanner. + + Args: + project_id: Cloud spanner project id. Be sure to use the Project ID, + not the Project Number. + instance_id: Cloud spanner instance id. + database_id: Cloud spanner database id. + max_batch_size_bytes: (optional) Split the mutation into batches to + reduce the number of transaction sent to Spanner. By default it is + set to 1 MB (1048576 Bytes). + """ + self._configuration = _BeamSpannerConfiguration( + project=project_id, instance=instance_id, database=database_id, + credentials=credentials, pool=pool, snapshot_read_timestamp=None, + snapshot_exact_staleness=None + ) + self._max_batch_size_bytes = max_batch_size_bytes + self._database_id = database_id + self._project_id = project_id + self._instance_id = instance_id + self._pool = pool + + def display_data(self): + res = { + 'project_id': DisplayDataItem(self._project_id, label='Project Id'), + 'instance_id': DisplayDataItem(self._instance_id, label='Instance Id'), + 'pool': DisplayDataItem(str(self._pool), label='Pool'), + 'database': DisplayDataItem(self._database_id, label='Database'), + 'batch_size': DisplayDataItem(self._max_batch_size_bytes, + label="Batch Size"), + } + return res + + def expand(self, pcoll): + return (pcoll + | "make batches" >> + _WriteGroup(max_batch_size_bytes=self._max_batch_size_bytes) + | 'Writing to spanner' >> ParDo( + _WriteToSpannerDoFn(self._configuration))) + + +class _Mutator(namedtuple('_Mutator', ["mutation", "operation", "kwargs"])): + __slots__ = () + + @property + def byte_size(self): + return self.mutation.ByteSize() + + +class MutationGroup(deque): + """ + A Bundle of Spanner Mutations (_Mutator). + """ + + @property + def byte_size(self): + s = 0 + for m in self.__iter__(): + s += m.byte_size + return s + + def primary(self): + return next(self.__iter__()) + + +class WriteMutation(object): + + _OPERATION_DELETE = "delete" + _OPERATION_INSERT = "insert" + _OPERATION_INSERT_OR_UPDATE = "insert_or_update" + _OPERATION_REPLACE = "replace" + _OPERATION_UPDATE = "update" + + def __init__(self, + insert=None, + update=None, + insert_or_update=None, + replace=None, + delete=None, + columns=None, + values=None, + keyset=None): + """ + A convenient class to create Spanner Mutations for Write. User can provide + the operation via constructor or via static methods. + + Note: If a user passing the operation via construction, make sure that it + will only accept one operation at a time. For example, if a user passing + a table name in the `insert` parameter, and he also passes the `update` + parameter value, this will cause an error. + + Args: + insert: (Optional) Name of the table in which rows will be inserted. + update: (Optional) Name of the table in which existing rows will be + updated. + insert_or_update: (Optional) Table name in which rows will be written. + Like insert, except that if the row already exists, then its column + values are overwritten with the ones provided. Any column values not + explicitly written are preserved. + replace: (Optional) Table name in which rows will be replaced. Like + insert, except that if the row already exists, it is deleted, and the + column values provided are inserted instead. Unlike `insert_or_update`, + this means any values not explicitly written become `NULL`. + delete: (Optional) Table name from which rows will be deleted. Succeeds + whether or not the named rows were present. + columns: The names of the columns in table to be written. The list of + columns must contain enough columns to allow Cloud Spanner to derive + values for all primary key columns in the row(s) to be modified. + values: The values to be written. `values` can contain more than one + list of values. If it does, then multiple rows are written, one for + each entry in `values`. Each list in `values` must have exactly as + many entries as there are entries in columns above. Sending multiple + lists is equivalent to sending multiple Mutations, each containing one + `values` entry and repeating table and columns. + keyset: (Optional) The primary keys of the rows within table to delete. + Delete is idempotent. The transaction will succeed even if some or + all rows do not exist. + """ + self._columns = columns + self._values = values + self._keyset = keyset + + self._insert = insert + self._update = update + self._insert_or_update = insert_or_update + self._replace = replace + self._delete = delete + + if sum([ + 1 for x in [self._insert, self._update, self._insert_or_update, + self._replace, self._delete] + if x is not None + ]) != 1: + raise ValueError("No or more than one write mutation operation " + "provided: <%s: %s>" % (self.__class__.__name__, + str(self.__dict__))) + + def __call__(self, *args, **kwargs): + if self._insert is not None: + return WriteMutation.insert( + table=self._insert, columns=self._columns, values=self._values) + elif self._update is not None: + return WriteMutation.update( + table=self._update, columns=self._columns, values=self._values) + elif self._insert_or_update is not None: + return WriteMutation.insert_or_update( + table=self._insert_or_update, + columns=self._columns, + values=self._values) + elif self._replace is not None: + return WriteMutation.replace( + table=self._replace, columns=self._columns, values=self._values) + elif self._delete is not None: + return WriteMutation.delete(table=self._delete, keyset=self._keyset) + + @staticmethod + def insert(table, columns, values): + """Insert one or more new table rows. + + Args: + table: Name of the table to be modified. + columns: Name of the table columns to be modified. + values: Values to be modified. + """ + return _Mutator( + mutation=Mutation(insert=batch._make_write_pb(table, columns, values)), + operation=WriteMutation._OPERATION_INSERT, kwargs={ + "table": table, "columns": columns, "values": values}) + + @staticmethod + def update(table, columns, values): + """Update one or more existing table rows. + + Args: + table: Name of the table to be modified. + columns: Name of the table columns to be modified. + values: Values to be modified. + """ + return _Mutator( + mutation=Mutation(update=batch._make_write_pb(table, columns, values)), + operation=WriteMutation._OPERATION_UPDATE, kwargs={ + "table": table, "columns": columns, "values": values}) + @staticmethod + def insert_or_update(table, columns, values): + """Insert/update one or more table rows. + Args: + table: Name of the table to be modified. + columns: Name of the table columns to be modified. + values: Values to be modified. + """ + return _Mutator( + mutation=Mutation( + insert_or_update=batch._make_write_pb(table, columns, values)), + operation=WriteMutation._OPERATION_INSERT_OR_UPDATE, kwargs={ + "table": table, "columns": columns, "values": values}) + + @staticmethod + def replace(table, columns, values): + """Replace one or more table rows. + + Args: + table: Name of the table to be modified. + columns: Name of the table columns to be modified. + values: Values to be modified. + """ + return _Mutator( + mutation=Mutation(replace=batch._make_write_pb(table, columns, values)), + operation=WriteMutation._OPERATION_REPLACE, kwargs={ + "table": table, "columns": columns, "values": values}) + + @staticmethod + def delete(table, keyset): + """Delete one or more table rows. + + Args: + table: Name of the table to be modified. + keyset: Keys/ranges identifying rows to delete. + """ + delete = Mutation.Delete(table=table, key_set=keyset._to_pb()) + return _Mutator(mutation=Mutation(delete=delete), + operation=WriteMutation._OPERATION_DELETE, + kwargs={"table": table, "keyset": keyset}) + + +@with_input_types(typing.Union[MutationGroup, TaggedOutput]) +@with_output_types(MutationGroup) +class _BatchFn(DoFn): + """ + Batches mutations together. + """ + + def __init__(self, max_batch_size_bytes): + self._max_batch_size_bytes = max_batch_size_bytes + + def start_bundle(self): + self._batch = MutationGroup() + self._size_in_bytes = 0 + + def process(self, element): + _max_bytes = self._max_batch_size_bytes + mg_size = element.byte_size # total size of the mutation group. + + if mg_size + self._size_in_bytes > _max_bytes: + # Batch is full, output the batch and resetting the count. + yield self._batch + self._size_in_bytes = 0 + self._batch = MutationGroup() + + self._batch.extend(element) + self._size_in_bytes += mg_size + + def finish_bundle(self): + if self._batch is not None: + yield window.GlobalWindows.windowed_value(self._batch) + self._batch = None + + +@with_input_types(MutationGroup) +@with_output_types(MutationGroup) +class _BatchableFilterFn(DoFn): + """ + Filters MutationGroups larger than the batch size to the output tagged with + OUTPUT_TAG_UNBATCHABLE. + """ + OUTPUT_TAG_UNBATCHABLE = 'unbatchable' + + def __init__(self, max_batch_size_bytes): + self._max_batch_size_bytes = max_batch_size_bytes + self._batchable = None + self._unbatchable = None + + def process(self, element): + if element.primary().operation == 'delete': + # As delete mutations are not batchable. + yield TaggedOutput(_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, element) + else: + _max_bytes = self._max_batch_size_bytes + mg = element + mg_size = mg.byte_size + if mg_size > _max_bytes: + yield TaggedOutput(_BatchableFilterFn.OUTPUT_TAG_UNBATCHABLE, element) + else: + yield element + + +class _WriteToSpannerDoFn(DoFn): + + def __init__(self, spanner_configuration): + self._spanner_configuration = spanner_configuration + self._db_instance = None + self.batches = Metrics.counter(self.__class__, 'SpannerBatches') + + def setup(self): + spanner_client = Client(self._spanner_configuration.project) + instance = spanner_client.instance(self._spanner_configuration.instance) + self._db_instance = instance.database( + self._spanner_configuration.database, + pool=self._spanner_configuration.pool) + + def process(self, element): + self.batches.inc() + with self._db_instance.batch() as b: + for m in element: + if m.operation == WriteMutation._OPERATION_DELETE: + batch_func = b.delete + elif m.operation == WriteMutation._OPERATION_REPLACE: + batch_func = b.replace + elif m.operation == WriteMutation._OPERATION_INSERT_OR_UPDATE: + batch_func = b.insert_or_update + elif m.operation == WriteMutation._OPERATION_INSERT: + batch_func = b.insert + elif m.operation == WriteMutation._OPERATION_UPDATE: + batch_func = b.update + else: + raise ValueError("Unknown operation action: %s" % m.operation) + + batch_func(**m.kwargs) Review comment: Sure, i'll update the docs! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 382975) Time Spent: 18h (was: 17h 50m) > Create a Spanner IO for Python > ------------------------------ > > Key: BEAM-7246 > URL: https://issues.apache.org/jira/browse/BEAM-7246 > Project: Beam > Issue Type: Bug > Components: io-py-gcp > Reporter: Reuven Lax > Assignee: Shehzaad Nakhoda > Priority: Major > Time Spent: 18h > Remaining Estimate: 0h > > Add I/O support for Google Cloud Spanner for the Python SDK (Batch Only). > Testing in this work item will be in the form of DirectRunner tests and > manual testing. > Integration and performance tests are a separate work item (not included > here). > See https://beam.apache.org/documentation/io/built-in/. The goal is to add > Google Clound Spanner to the Database column for the Python/Batch row. -- This message was sent by Atlassian Jira (v8.3.4#803005)