[ 
https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290109&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290109
 ]

ASF GitHub Bot logged work on BEAM-7866:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Aug/19 00:53
            Start Date: 07/Aug/19 00:53
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on pull request #9233:  
[BEAM-7866] Fix python ReadFromMongoDB potential data loss issue
URL: https://github.com/apache/beam/pull/9233#discussion_r311324493
 
 

 ##########
 File path: sdks/python/apache_beam/io/mongodbio.py
 ##########
 @@ -194,18 +212,110 @@ def display_data(self):
     res['mongo_client_spec'] = self.spec
     return res
 
-  def _get_avg_document_size(self):
+  def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos):
+    # if desired chunk size smaller than 1mb, use mongodb default split size of
+    # 1mb
+    if desired_chunk_size_in_mb < 1:
+      desired_chunk_size_in_mb = 1
+    if start_pos >= end_pos:
+      # single document not splittable
+      return []
     with MongoClient(self.uri, **self.spec) as client:
-      size = client[self.db].command('collstats', self.coll).get('avgObjSize')
-      if size is None or size <= 0:
-        raise ValueError(
-            'Collection %s not found or average doc size is '
-            'incorrect', self.coll)
-      return size
-
-  def _get_document_count(self):
+      name_space = '%s.%s' % (self.db, self.coll)
+      return (client[self.db].command(
+          'splitVector',
+          name_space,
+          keyPattern={'_id': 1},
+          min={'_id': start_pos},
+          max={'_id': end_pos},
+          maxChunkSize=desired_chunk_size_in_mb)['splitKeys'])
+
+  def _merge_id_filter(self, range_tracker):
+    all_filters = self.filter.copy()
+    if '_id' in all_filters:
+      id_filter = all_filters['_id']
+      id_filter['$gte'] = (
+          max(id_filter['$gte'], range_tracker.start_position())
+          if '$gte' in id_filter else range_tracker.start_position())
+
+      id_filter['$lt'] = (min(id_filter['$lt'], range_tracker.stop_position())
+                          if '$lt' in id_filter else
+                          range_tracker.stop_position())
+    else:
+      all_filters.update({
+          '_id': {
+              '$gte': range_tracker.start_position(),
+              '$lt': range_tracker.stop_position()
+          }
+      })
+    return all_filters
+
+  def _get_head_document_id(self, sort_order):
     with MongoClient(self.uri, **self.spec) as client:
-      return max(client[self.db][self.coll].count_documents(self.filter), 0)
+      cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([
+          ('_id', sort_order)
+      ]).limit(1)
+      try:
+        return cursor[0]['_id']
+      except IndexError:
+        raise ValueError('Empty Mongodb collection')
+
+
+class _ObjectIdHelper(object):
+  """A Utility class to bson object ids."""
+
+  @classmethod
+  def id_to_int(cls, id):
+    # converts object id binary to integer
 
 Review comment:
   Please add a proper doc comment including variables if possible. (even 
though containing class is private, better to properly describe these functions 
that contain non-trivial byte manipulations).
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 290109)
    Time Spent: 3h 40m  (was: 3.5h)

> Python MongoDB IO performance and correctness issues
> ----------------------------------------------------
>
>                 Key: BEAM-7866
>                 URL: https://issues.apache.org/jira/browse/BEAM-7866
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Eugene Kirpichov
>            Assignee: Yichi Zhang
>            Priority: Blocker
>             Fix For: 2.15.0
>
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py
>  splits the query result by computing number of results in constructor, and 
> then in each reader re-executing the whole query and getting an index 
> sub-range of those results.
> This is broken in several critical ways:
> - The order of query results returned by find() is not necessarily 
> deterministic, so the idea of index ranges on it is meaningless: each shard 
> may basically get random, possibly overlapping subsets of the total results
> - Even if you add order by `_id`, the database may be changing concurrently 
> to reading and splitting. E.g. if the database contained documents with ids 
> 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the 
> assumption that these shards would contain respectively 10 20 30, and 40 50), 
> and then suppose shard 10 20 30 is read and then document 25 is inserted - 
> then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and 
> document 25 is lost.
> - Every shard re-executes the query and skips the first start_offset items, 
> which in total is quadratic complexity
> - The query is first executed in the constructor in order to count results, 
> which 1) means the constructor can be super slow and 2) it won't work at all 
> if the database is unavailable at the time the pipeline is constructed (e.g. 
> if this is a template).
> Unfortunately, none of these issues are caught by SourceTestUtils: this class 
> has extensive coverage with it, and the tests pass. This is because the tests 
> return the same results in the same order. I don't know how to catch this 
> automatically, and I don't know how to catch the performance issue 
> automatically, but these would all be important follow-up items after the 
> actual fix.
> CC: [~chamikara] as reviewer.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to