MaksymSkorupskyi commented on a change in pull request #14460:
URL: https://github.com/apache/beam/pull/14460#discussion_r649809290



##########
File path: sdks/python/apache_beam/io/mongodbio.py
##########
@@ -136,30 +149,195 @@ def __init__(
 
     Returns:
       :class:`~apache_beam.transforms.ptransform.PTransform`
-
     """
     if extra_client_params is None:
       extra_client_params = {}
     if not isinstance(db, str):
-      raise ValueError('ReadFromMongDB db param must be specified as a string')
+      raise ValueError("ReadFromMongDB db param must be specified as a string")
     if not isinstance(coll, str):
       raise ValueError(
-          'ReadFromMongDB coll param must be specified as a '
-          'string')
+          "ReadFromMongDB coll param must be specified as a string")
     self._mongo_source = _BoundedMongoSource(
         uri=uri,
         db=db,
         coll=coll,
         filter=filter,
         projection=projection,
         extra_client_params=extra_client_params,
-        bucket_auto=bucket_auto)
+        bucket_auto=bucket_auto,
+    )
 
   def expand(self, pcoll):
     return pcoll | iobase.Read(self._mongo_source)
 
 
+class _ObjectIdRangeTracker(OrderedPositionRangeTracker):
+  """RangeTracker for tracking mongodb _id of bson ObjectId type."""
+  def position_to_fraction(
+      self,
+      pos: ObjectId,
+      start: ObjectId,
+      end: ObjectId,
+  ):
+    """Returns the fraction of keys in the range [start, end) that
+    are less than the given key.
+    """
+    pos_number = _ObjectIdHelper.id_to_int(pos)
+    start_number = _ObjectIdHelper.id_to_int(start)
+    end_number = _ObjectIdHelper.id_to_int(end)
+    return (pos_number - start_number) / (end_number - start_number)
+
+  def fraction_to_position(
+      self,
+      fraction: float,
+      start: ObjectId,
+      end: ObjectId,
+  ):
+    """Converts a fraction between 0 and 1
+    to a position between start and end.
+    """
+    start_number = _ObjectIdHelper.id_to_int(start)
+    end_number = _ObjectIdHelper.id_to_int(end)
+    total = end_number - start_number
+    pos = int(total * fraction + start_number)
+    # make sure split position is larger than start position and smaller than
+    # end position.
+    if pos <= start_number:
+      return _ObjectIdHelper.increment_id(start, 1)
+
+    if pos >= end_number:
+      return _ObjectIdHelper.increment_id(end, -1)
+
+    return _ObjectIdHelper.int_to_id(pos)
+
+
+class _StrRangeTracker(LexicographicKeyRangeTracker):

Review comment:
       The only reason I've iplented `_StrRangeTracker` is not to change code 
outside of `sdks/python/apache_beam/io/mongodbio.py`.
   
   Please review my last commit with required changes - update 
LexicographicKeyRangeTracker to handle both `bytes` and `str` keys:
   e8d855a48fb8a72492ff09cb189683df69ef6bc2

##########
File path: sdks/python/apache_beam/io/mongodbio.py
##########
@@ -136,30 +149,195 @@ def __init__(
 
     Returns:
       :class:`~apache_beam.transforms.ptransform.PTransform`
-
     """
     if extra_client_params is None:
       extra_client_params = {}
     if not isinstance(db, str):
-      raise ValueError('ReadFromMongDB db param must be specified as a string')
+      raise ValueError("ReadFromMongDB db param must be specified as a string")
     if not isinstance(coll, str):
       raise ValueError(
-          'ReadFromMongDB coll param must be specified as a '
-          'string')
+          "ReadFromMongDB coll param must be specified as a string")
     self._mongo_source = _BoundedMongoSource(
         uri=uri,
         db=db,
         coll=coll,
         filter=filter,
         projection=projection,
         extra_client_params=extra_client_params,
-        bucket_auto=bucket_auto)
+        bucket_auto=bucket_auto,
+    )
 
   def expand(self, pcoll):
     return pcoll | iobase.Read(self._mongo_source)
 
 
+class _ObjectIdRangeTracker(OrderedPositionRangeTracker):
+  """RangeTracker for tracking mongodb _id of bson ObjectId type."""
+  def position_to_fraction(
+      self,
+      pos: ObjectId,
+      start: ObjectId,
+      end: ObjectId,
+  ):
+    """Returns the fraction of keys in the range [start, end) that
+    are less than the given key.
+    """
+    pos_number = _ObjectIdHelper.id_to_int(pos)
+    start_number = _ObjectIdHelper.id_to_int(start)
+    end_number = _ObjectIdHelper.id_to_int(end)
+    return (pos_number - start_number) / (end_number - start_number)
+
+  def fraction_to_position(
+      self,
+      fraction: float,
+      start: ObjectId,
+      end: ObjectId,
+  ):
+    """Converts a fraction between 0 and 1
+    to a position between start and end.
+    """
+    start_number = _ObjectIdHelper.id_to_int(start)
+    end_number = _ObjectIdHelper.id_to_int(end)
+    total = end_number - start_number
+    pos = int(total * fraction + start_number)
+    # make sure split position is larger than start position and smaller than
+    # end position.
+    if pos <= start_number:
+      return _ObjectIdHelper.increment_id(start, 1)
+
+    if pos >= end_number:
+      return _ObjectIdHelper.increment_id(end, -1)
+
+    return _ObjectIdHelper.int_to_id(pos)
+
+
+class _StrRangeTracker(LexicographicKeyRangeTracker):

Review comment:
       The only reason I've implemented `_StrRangeTracker` is not to change 
code outside of `sdks/python/apache_beam/io/mongodbio.py`.
   
   Please review my last commit with required changes - update 
LexicographicKeyRangeTracker to handle both `bytes` and `str` keys:
   e8d855a48fb8a72492ff09cb189683df69ef6bc2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to