ahmedabu98 commented on code in PR #26593:
URL: https://github.com/apache/beam/pull/26593#discussion_r1235618745


##########
sdks/python/apache_beam/io/gcp/bigtableio.py:
##########
@@ -227,3 +231,75 @@ def expand(self, pvalue):
                 beam_options['project_id'],
                 beam_options['instance_id'],
                 beam_options['table_id'])))
+
+
+class ReadFromBigtable(PTransform):
+  """Reads rows from Bigtable.
+
+  Returns a PCollection of PartialRowData objects, each representing a
+  Bigtable row. For more information about this row object, visit
+  
https://cloud.google.com/python/docs/reference/bigtable/latest/row#class-googlecloudbigtablerowpartialrowdatarowkey
+  """
+  URN = "beam:schematransform:org.apache.beam:bigtable_read:v1"
+
+  def __init__(self, table_id, instance_id, project_id, 
expansion_service=None):
+    """Initialize a ReadFromBigtable transform.
+
+    :param table_id:
+      The ID of the table to read from.
+    :param instance_id:
+      The ID of the instance where the table resides.
+    :param project_id:
+      The GCP project ID.
+    :param expansion_service:
+      The address of the expansion service. If no expansion service is
+      provided, will attempt to run the default GCP expansion service.
+    """
+    super().__init__()
+    self._table_id = table_id
+    self._instance_id = instance_id
+    self._project_id = project_id
+    self._expansion_service = (
+        expansion_service or BeamJarExpansionService(
+            'sdks:java:io:google-cloud-platform:expansion-service:build'))
+    self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.URN)
+
+  def expand(self, input):
+    external_read = SchemaAwareExternalTransform(
+        identifier=self.schematransform_config.identifier,
+        expansion_service=self._expansion_service,
+        rearrange_based_on_discovery=True,
+        tableId=self._table_id,
+        instanceId=self._instance_id,
+        projectId=self._project_id)
+
+    return (
+        input.pipeline
+        | external_read
+        | beam.ParDo(self._BeamRowToPartialRowData()))
+
+  # PartialRowData has some useful methods for querying data within a row.
+  # To make use of those methods and to give Python users a more familiar
+  # object, we process each Beam Row and return a PartialRowData equivalent.
+  class _BeamRowToPartialRowData(beam.DoFn):

Review Comment:
   the data type returned from reading Bigtable cells is always bytes ([code 
documentation](https://cloud.google.com/java/docs/reference/google-cloud-bigtable/latest/com.google.bigtable.v2.Cell#com_google_bigtable_v2_Cell_getValue__))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to