[ https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=362824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-362824 ]
ASF GitHub Bot logged work on BEAM-3342: ---------------------------------------- Author: ASF GitHub Bot Created on: 24/Dec/19 00:40 Start Date: 24/Dec/19 00:40 Worklog Time Spent: 10m Work Description: mf2199 commented on pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#discussion_r361033718 ########## File path: sdks/python/apache_beam/io/gcp/bigtableio.py ########## @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform): A PTransform that write a list of `DirectRow` into the Bigtable Table """ - def __init__(self, project_id=None, instance_id=None, - table_id=None): + def __init__(self, project_id=None, instance_id=None, table_id=None): """ The PTransform to access the Bigtable Write connector Args: project_id(str): GCP Project of to write the Rows instance_id(str): GCP Instance to write the Rows table_id(str): GCP Table to write the `DirectRows` """ super(WriteToBigTable, self).__init__() - self.beam_options = {'project_id': project_id, - 'instance_id': instance_id, - 'table_id': table_id} + self._beam_options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id} def expand(self, pvalue): - beam_options = self.beam_options + beam_options = self._beam_options return (pvalue | beam.ParDo(_BigTableWriteFn(beam_options['project_id'], beam_options['instance_id'], beam_options['table_id']))) + + +class _BigtableReadFn(beam.DoFn): + """ Creates the connector that can read rows for Beam pipeline + + Args: + project_id(str): GCP Project ID + instance_id(str): GCP Instance ID + table_id(str): GCP Table ID + + """ + + def __init__(self, project_id, instance_id, table_id, filter_=b''): + """ Constructor of the Read connector of Bigtable + + Args: + project_id: [str] GCP Project of to write the Rows + instance_id: [str] GCP Instance to write the Rows + table_id: [str] GCP Table to write the `DirectRows` + filter_: [RowFilter] Filter to apply to columns in a row. + """ + super(self.__class__, self).__init__() + self._initialize({'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_}) + + def __getstate__(self): + return self._beam_options + + def __setstate__(self, options): + self._initialize(options) + + def _initialize(self, options): + self._beam_options = options + self.table = None + self.sample_row_keys = None + self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read') + + def start_bundle(self): + if self.table is None: + self.table = Client(project=self._beam_options['project_id'])\ + .instance(self._beam_options['instance_id'])\ + .table(self._beam_options['table_id']) + + def process(self, element, **kwargs): + for row in self.table.read_rows(start_key=element.start_position, + end_key=element.end_position, + filter_=self._beam_options['filter_']): + self.row_count.inc() + yield row + + def display_data(self): + return {'projectId': DisplayDataItem(self._beam_options['project_id'], + label='Bigtable Project Id'), + 'instanceId': DisplayDataItem(self._beam_options['instance_id'], + label='Bigtable Instance Id'), + 'tableId': DisplayDataItem(self._beam_options['table_id'], + label='Bigtable Table Id'), + 'filter_': DisplayDataItem(str(self._beam_options['filter_']), + label='Bigtable Filter') + } + + +class ReadFromBigTable(beam.PTransform): + def __init__(self, project_id, instance_id, table_id, filter_=b''): + """ The PTransform to access the Bigtable Read connector + + Args: + project_id: [str] GCP Project of to read the Rows + instance_id): [str] GCP Instance to read the Rows + table_id): [str] GCP Table to read the Rows + filter_: [RowFilter] Filter to apply to columns in a row. + """ + super(self.__class__, self).__init__() + self._beam_options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_} + + def __getstate__(self): + return self._beam_options + + def __setstate__(self, options): + self._beam_options = options + + def expand(self, pbegin): + from apache_beam.transforms import util + + beam_options = self._beam_options + table = Client(project=beam_options['project_id'])\ + .instance(beam_options['instance_id'])\ + .table(beam_options['table_id']) + sample_row_keys = list(table.sample_row_keys()) + + if len(sample_row_keys) > 1 and sample_row_keys[0].row_key != b'': + SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes") + first_key = SampleRowKey(b'', 0) + sample_row_keys.insert(0, first_key) + sample_row_keys = list(sample_row_keys) + + def split_source(unused_impulse): + bundles = [] + for i in range(1, len(sample_row_keys)): Review comment: "When packaged" means when packed into a local tarball, precisely via `python setup.py sdist`. So far we've been using `extra_package` option, and it seems to work fine for the `write` part, and at some point worked for the `read` part too. The main functional difference between the `write` and `read` tests is that for the `write` part the rows are generated locally [within the test script], then sent to workers without intermediate steps, while during the `read` sequence we first try splitting the database based on the sample row keys within the Dataflow engine, and then read the chunks of rows, as you may see from the sample script above. But I don't disagree, we could try specifying the `--sdk_location` option, see what happens. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 362824) Time Spent: 43h 50m (was: 43h 40m) > Create a Cloud Bigtable IO connector for Python > ----------------------------------------------- > > Key: BEAM-3342 > URL: https://issues.apache.org/jira/browse/BEAM-3342 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Solomon Duskis > Assignee: Solomon Duskis > Priority: Major > Time Spent: 43h 50m > Remaining Estimate: 0h > > I would like to create a Cloud Bigtable python connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)