ahmedabu98 commented on code in PR #26593:
URL: https://github.com/apache/beam/pull/26593#discussion_r1235680762
##########
sdks/python/apache_beam/io/gcp/bigtableio.py:
##########
@@ -227,3 +231,75 @@ def expand(self, pvalue):
beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))
+
+
+class ReadFromBigtable(PTransform):
+ """Reads rows from Bigtable.
+
+ Returns a PCollection of PartialRowData objects, each representing a
+ Bigtable row. For more information about this row object, visit
+
https://cloud.google.com/python/docs/reference/bigtable/latest/row#class-googlecloudbigtablerowpartialrowdatarowkey
+ """
+ URN = "beam:schematransform:org.apache.beam:bigtable_read:v1"
+
+ def __init__(self, table_id, instance_id, project_id,
expansion_service=None):
+ """Initialize a ReadFromBigtable transform.
+
+ :param table_id:
+ The ID of the table to read from.
+ :param instance_id:
+ The ID of the instance where the table resides.
+ :param project_id:
+ The GCP project ID.
+ :param expansion_service:
+ The address of the expansion service. If no expansion service is
+ provided, will attempt to run the default GCP expansion service.
+ """
+ super().__init__()
+ self._table_id = table_id
+ self._instance_id = instance_id
+ self._project_id = project_id
+ self._expansion_service = (
+ expansion_service or BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build'))
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ external_read = SchemaAwareExternalTransform(
+ identifier=self.schematransform_config.identifier,
+ expansion_service=self._expansion_service,
+ rearrange_based_on_discovery=True,
+ tableId=self._table_id,
+ instanceId=self._instance_id,
+ projectId=self._project_id)
+
+ return (
+ input.pipeline
+ | external_read
+ | beam.ParDo(self._BeamRowToPartialRowData()))
+
+ # PartialRowData has some useful methods for querying data within a row.
+ # To make use of those methods and to give Python users a more familiar
+ # object, we process each Beam Row and return a PartialRowData equivalent.
+ class _BeamRowToPartialRowData(beam.DoFn):
+ def process(self, row):
+ key = row.key
+ families = row.column_families
+
+ # initialize PartialRowData object
+ partial_row: PartialRowData = PartialRowData(key)
+ for fam_name, col_fam in families.items():
+ if fam_name not in partial_row.cells:
+ partial_row.cells[fam_name] = {}
+ for col_qualifier, cells in col_fam.items():
+ # store column qualifier as bytes to follow PartialRowData behavior
+ col_qualifier_bytes = col_qualifier.encode()
+ if col_qualifier not in partial_row.cells[fam_name]:
+ partial_row.cells[fam_name][col_qualifier_bytes] = []
Review Comment:
Added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]