chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396693125
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##########
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
             | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
                                           beam_options['instance_id'],
                                           beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+    """ A DoFn to parallelize reading from a Bigtable table
+
+    :type project_id: str
+    :param project_id: The ID of the project used for Bigtable access
+
+    :type instance_id: str
+    :param instance_id: The ID of the instance that owns the table.
+
+    :type table_id: str
+    :param table_id: The ID of the table.
+
+    :type filter_: :class:`.RowFilter`
+    :param filter_: (Optional) The filter to apply to the contents of the
+                    specified row(s). If unset, reads every column in
+                    each row.
+    """
+    super(self.__class__, self).__init__()
+    self._initialize({'project_id': project_id,
+                     'instance_id': instance_id,
+                     'table_id': table_id,
+                     'filter_': filter_})
+
+  def _initialize(self, options):
+    """ The defaults initializer, to assist with pickling
+
+    :return: None
+    """
+    self._options = options
+    self._table = None
+    self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+    return self._options
+
+  def __setstate__(self, options):
+    self._initialize(options)
+
+  def start_bundle(self):
+    # from google.cloud.bigtable import Client
+    if self._table is None:
+      # noinspection PyAttributeOutsideInit
+      self._table = Client(project=self._options['project_id'])\
+        .instance(self._options['instance_id'])\
+        .table(self._options['table_id'])
+
+  def process(self, source_bundle):
+    _start_key = source_bundle.start_position
+    _end_key = source_bundle.stop_position
+    for row in self._table.read_rows(_start_key, _end_key):
+      self._counter.inc()
+      yield row
+
+  def display_data(self):
+    return {'projectId': DisplayDataItem(self._options['project_id'],
+                                         label='Bigtable Project Id'),
+            'instanceId': DisplayDataItem(self._options['instance_id'],
+                                          label='Bigtable Instance Id'),
+            'tableId': DisplayDataItem(self._options['table_id'],
+                                       label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+    """ A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+    :type project_id: str
+    :param project_id: The ID of the project used for Bigtable access
+
+    :type instance_id: str
+    :param instance_id: The ID of the instance that owns the table.
+
+    :type table_id: str
+    :param table_id: The ID of the table.
+
+    :type filter_: :class:`.RowFilter`
+    :param filter_: (Optional) The filter to apply to the contents of the
+                    specified row(s). If unset, reads every column in
+                    each row. If noe is provided, all rows are read by default.
+    """
+    super(self.__class__, self).__init__()
+    self._options = {'project_id': project_id,
+                     'instance_id': instance_id,
+                     'table_id': table_id,
+                     'filter_': filter_}
+
+  def __getstate__(self):
+    return self._options
+
+  def __setstate__(self, options):
+    self._options = options
+
+  def expand(self, pbegin):
+    table = Client(project=self._options['project_id'], admin=True) \
+      .instance(instance_id=self._options['instance_id']) \
+      .table(table_id=self._options['table_id'])
+
+    keys = list(table.sample_row_keys())
+
+    SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+    keys.insert(0, SampleRowKey(b'', 0))
+
+    def chunks():
+      for i in range(1, len(keys)):
+        key_1 = keys[i - 1].row_key
+        key_2 = keys[i].row_key
+        size = keys[i].offset_bytes - keys[i - 1].offset_bytes
+        yield iobase.SourceBundle(size, None, key_1, key_2)
 
 Review comment:
   You don't need to use SourceTestBundle if you are not using Read transform. 
Please use a separate data structure  (or just a tuple) here to avoid confusion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to