[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=295037=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295037 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 14/Aug/19 21:33 Start Date: 14/Aug/19 21:33 Worklog Time Spent: 10m Work Description: yifanzou commented on pull request #9342: [BEAM-7866][BEAM-5148] Cherry-picks mongodb fixes to 2.15.0 release branch URL: https://github.com/apache/beam/pull/9342 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 295037) Time Spent: 12h (was: 11h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Major > Fix For: 2.16.0 > > Time Spent: 12h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=295036=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295036 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 14/Aug/19 21:32 Start Date: 14/Aug/19 21:32 Worklog Time Spent: 10m Work Description: yifanzou commented on issue #9342: [BEAM-7866][BEAM-5148] Cherry-picks mongodb fixes to 2.15.0 release branch URL: https://github.com/apache/beam/pull/9342#issuecomment-521427155 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 295036) Time Spent: 11h 50m (was: 11h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Major > Fix For: 2.16.0 > > Time Spent: 11h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=295023=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295023 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 14/Aug/19 21:09 Start Date: 14/Aug/19 21:09 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #9342: [BEAM-7866][BEAM-5148] Cherry-picks mongodb fixes to 2.15.0 release branch URL: https://github.com/apache/beam/pull/9342#issuecomment-521419857 R: @yifanzou CC: @y1chi @aaltay This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 295023) Time Spent: 11h 40m (was: 11.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Major > Fix For: 2.16.0 > > Time Spent: 11h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=295022=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-295022 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 14/Aug/19 21:09 Start Date: 14/Aug/19 21:09 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9342: [BEAM-7866][BEAM-5148] Cherry-picks mongodb fixes to 2.15.0 release branch URL: https://github.com/apache/beam/pull/9342 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=294317=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294317 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 14/Aug/19 01:19 Start Date: 14/Aug/19 01:19 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 294317) Time Spent: 11h 20m (was: 11h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Major > Fix For: 2.16.0 > > Time Spent: 11h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=294316=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-294316 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 14/Aug/19 01:18 Start Date: 14/Aug/19 01:18 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-521066083 Eugene seems to be OOO. So I'll go ahead and merge this so that this can be cherry-picked to 2.15.0 release. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 294316) Time Spent: 11h 10m (was: 11h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Major > Fix For: 2.16.0 > > Time Spent: 11h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=293518=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293518 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 13/Aug/19 01:05 Start Date: 13/Aug/19 01:05 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r313187178 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -210,6 +323,74 @@ def test_write_to_mongodb_with_generated_id(self, mock_client): return_value.bulk_write.assert_called_with(expected_update) +class ObjectIdHelperTest(TestCase): + def test_conversion(self): +test_cases = [ +(objectid.ObjectId(''), 0), +(objectid.ObjectId('0001'), 2**32), +(objectid.ObjectId(''), 2**32 - 1), +(objectid.ObjectId('0001'), 2**64), +(objectid.ObjectId(''), 2**64 - 1), +(objectid.ObjectId(''), 2**96 - 1), +] +for (id, number) in test_cases: + self.assertEqual(id, _ObjectIdHelper.int_to_id(number)) + self.assertEqual(number, _ObjectIdHelper.id_to_int(id)) + +# random tests +for _ in range(100): + id = objectid.ObjectId() + if sys.version_info[0] < 3: +number = int(id.binary.encode('hex'), 16) + else: # PY3 +number = int(id.binary.hex(), 16) + self.assertEqual(id, _ObjectIdHelper.int_to_id(number)) + self.assertEqual(number, _ObjectIdHelper.id_to_int(id)) + + def test_increment_id(self): +test_cases = [ +(objectid.ObjectId('0001'), + objectid.ObjectId('')), +(objectid.ObjectId('0001'), + objectid.ObjectId('')), +] +for (first, second) in test_cases: + self.assertEqual(second, _ObjectIdHelper.increment_id(first, -1)) + self.assertEqual(first, _ObjectIdHelper.increment_id(second, 1)) + +for _ in range(100): + id = objectid.ObjectId() + self.assertLess(id, _ObjectIdHelper.increment_id(id, 1)) + self.assertGreater(id, _ObjectIdHelper.increment_id(id, -1)) + + +class ObjectRangeTrackerTest(TestCase): + def test_fraction_position_conversion(self): +start_int = 0 +stop_int = 2**96 - 1 +start = _ObjectIdHelper.int_to_id(start_int) +stop = _ObjectIdHelper.int_to_id(stop_int) +test_cases = ([start_int, stop_int, 2**32, 2**32 - 1, 2**64, 2**64 - 1] + + [random.randint(start_int, stop_int) for _ in range(100)]) +tracker = _ObjectIdRangeTracker() +for pos in test_cases: + id = _ObjectIdHelper.int_to_id(pos - start_int) + desired_fraction = (pos - start_int) / (stop_int - start_int) + self.assertAlmostEqual(tracker.position_to_fraction(id, start, stop), Review comment: tested in 10k loop, all passed on my laptop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 293518) Time Spent: 11h (was: 10h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Major > Fix For: 2.16.0 > > Time Spent: 11h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=293507=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293507 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 13/Aug/19 00:35 Start Date: 13/Aug/19 00:35 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r313182132 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -210,6 +323,74 @@ def test_write_to_mongodb_with_generated_id(self, mock_client): return_value.bulk_write.assert_called_with(expected_update) +class ObjectIdHelperTest(TestCase): + def test_conversion(self): +test_cases = [ +(objectid.ObjectId(''), 0), +(objectid.ObjectId('0001'), 2**32), +(objectid.ObjectId(''), 2**32 - 1), +(objectid.ObjectId('0001'), 2**64), +(objectid.ObjectId(''), 2**64 - 1), +(objectid.ObjectId(''), 2**96 - 1), +] +for (id, number) in test_cases: + self.assertEqual(id, _ObjectIdHelper.int_to_id(number)) + self.assertEqual(number, _ObjectIdHelper.id_to_int(id)) + +# random tests +for _ in range(100): + id = objectid.ObjectId() + if sys.version_info[0] < 3: +number = int(id.binary.encode('hex'), 16) + else: # PY3 +number = int(id.binary.hex(), 16) + self.assertEqual(id, _ObjectIdHelper.int_to_id(number)) + self.assertEqual(number, _ObjectIdHelper.id_to_int(id)) + + def test_increment_id(self): +test_cases = [ +(objectid.ObjectId('0001'), + objectid.ObjectId('')), +(objectid.ObjectId('0001'), + objectid.ObjectId('')), +] +for (first, second) in test_cases: + self.assertEqual(second, _ObjectIdHelper.increment_id(first, -1)) + self.assertEqual(first, _ObjectIdHelper.increment_id(second, 1)) + +for _ in range(100): + id = objectid.ObjectId() + self.assertLess(id, _ObjectIdHelper.increment_id(id, 1)) + self.assertGreater(id, _ObjectIdHelper.increment_id(id, -1)) + + +class ObjectRangeTrackerTest(TestCase): + def test_fraction_position_conversion(self): +start_int = 0 +stop_int = 2**96 - 1 +start = _ObjectIdHelper.int_to_id(start_int) +stop = _ObjectIdHelper.int_to_id(stop_int) +test_cases = ([start_int, stop_int, 2**32, 2**32 - 1, 2**64, 2**64 - 1] + + [random.randint(start_int, stop_int) for _ in range(100)]) +tracker = _ObjectIdRangeTracker() +for pos in test_cases: + id = _ObjectIdHelper.int_to_id(pos - start_int) + desired_fraction = (pos - start_int) / (stop_int - start_int) + self.assertAlmostEqual(tracker.position_to_fraction(id, start, stop), Review comment: Please run this test 10k times and make sure that this is not flaky. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 293507) Time Spent: 10h 50m (was: 10h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Major > Fix For: 2.16.0 > > Time Spent: 10h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=292180=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-292180 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 09/Aug/19 17:23 Start Date: 09/Aug/19 17:23 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-519998727 @jkff @chamikaramj updated the PR, would you mind give it another look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 292180) Time Spent: 10h 40m (was: 10.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 10h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291561=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291561 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 20:23 Start Date: 08/Aug/19 20:23 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-519673706 Run Python MongoDBIO_IT This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291561) Time Spent: 10.5h (was: 10h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 10.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291560 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 20:18 Start Date: 08/Aug/19 20:18 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-519671952 > @y1chi I have patched my forked 2.14.0 with the fixes in this ticket. What would be a good performance test to verify this fix? Probably can try with your own data set similar to https://github.com/apache/beam/blob/75150df8bb02cdafc983305631506d70d6039f86/sdks/python/apache_beam/io/mongodbio_it_test.py ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291560) Time Spent: 10h 20m (was: 10h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 10h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291555=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291555 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 20:05 Start Date: 08/Aug/19 20:05 Worklog Time Spent: 10m Work Description: manuelaguilar commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-519667754 @y1chi I have patched my forked 2.14.0 with the fixes in this ticket. What would be a good performance test to verify this fix? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291555) Time Spent: 10h 10m (was: 10h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 10h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291507=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291507 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 18:56 Start Date: 08/Aug/19 18:56 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312194199 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -210,6 +289,47 @@ def test_write_to_mongodb_with_generated_id(self, mock_client): return_value.bulk_write.assert_called_with(expected_update) +class ObjectIdHelperTest(TestCase): + def test_conversion(self): +test_cases = [ +(objectid.ObjectId(''), 0), +(objectid.ObjectId('0001'), 2**32), +(objectid.ObjectId(''), 2**32 - 1), +(objectid.ObjectId('0001'), 2**64), +(objectid.ObjectId(''), 2**64 - 1), +(objectid.ObjectId(''), 2**96 - 1), +] +for (id, number) in test_cases: + self.assertEqual(id, _ObjectIdHelper.int_to_id(number)) + self.assertEqual(number, _ObjectIdHelper.id_to_int(id)) + +# random tests +for _ in range(100): + id = objectid.ObjectId() + if sys.version_info[0] < 3: +number = int(id.binary.encode('hex'), 16) + else: # PY3 +number = int(id.binary.hex(), 16) + self.assertEqual(id, _ObjectIdHelper.int_to_id(number)) + self.assertEqual(number, _ObjectIdHelper.id_to_int(id)) + + def test_increment_id(self): +test_cases = [ +(objectid.ObjectId('0001'), + objectid.ObjectId('')), +(objectid.ObjectId('0001'), + objectid.ObjectId('')), +] +for (first, second) in test_cases: + self.assertEqual(second, _ObjectIdHelper.increment_id(first, -1)) + self.assertEqual(first, _ObjectIdHelper.increment_id(second, 1)) + +for _ in range(100): + id = objectid.ObjectId() + self.assertLess(id, _ObjectIdHelper.increment_id(id, 1)) + self.assertGreater(id, _ObjectIdHelper.increment_id(id, -1)) Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291507) Time Spent: 10h (was: 9h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 10h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291506=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291506 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 18:56 Start Date: 08/Aug/19 18:56 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312194142 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -72,72 +176,47 @@ def test_split(self, mock_client): @mock.patch('apache_beam.io.mongodbio.MongoClient') def test_dynamic_work_rebalancing(self, mock_client): -splits = list(self.mongo_source.split(desired_bundle_size=3000)) -mock_client.return_value.__enter__.return_value.__getitem__.return_value \ - .__getitem__.return_value.find.return_value = [{'x': 1}, {'x': 2}, - {'x': 3}, {'x': 4}, - {'x': 5}] +mock_client.return_value = _MockMongoClient(self._docs) +splits = list( +self.mongo_source.split(desired_bundle_size=3000 * 1024 * 1024)) assert len(splits) == 1 source_test_utils.assert_split_at_fraction_exhaustive( splits[0].source, splits[0].start_position, splits[0].stop_position) - @mock.patch('apache_beam.io.mongodbio.OffsetRangeTracker') - def test_get_range_tracker(self, mock_tracker): -self.mongo_source.get_range_tracker(None, None) -mock_tracker.assert_called_with(0, 5) -self.mongo_source.get_range_tracker(10, 20) -mock_tracker.assert_called_with(10, 20) + @mock.patch('apache_beam.io.mongodbio.MongoClient') + def test_get_range_tracker(self, mock_client): +mock_client.return_value = _MockMongoClient(self._docs) +self.assertIsInstance(self.mongo_source.get_range_tracker(None, None), + _ObjectIdRangeTracker) @mock.patch('apache_beam.io.mongodbio.MongoClient') def test_read(self, mock_client): mock_tracker = mock.MagicMock() -mock_tracker.try_claim.return_value = True -mock_tracker.start_position.return_value = 0 -mock_tracker.stop_position.return_value = 2 +mock_tracker.start_position.return_value = self._ids[0] Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291506) Time Spent: 9h 50m (was: 9h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 9h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291466 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 17:46 Start Date: 08/Aug/19 17:46 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312163044 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +34,138 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdHelper +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + """Fake mongodb collection cursor.""" + + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +if '$and' not in filter or not filter['$and']: + return self +start = filter['$and'][0]['_id'].get('$gte') +end = filter['$and'][0]['_id'].get('$lt') +assert start is not None +assert end is not None +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, index): +return self.docs[index] + + +class _MockMongoDb(object): + """Fake Mongo Db.""" + + def __init__(self, docs): +self.docs = docs + + def __getitem__(self, coll_name): +return _MockMongoColl(self.docs) + + def command(self, command, *args, **kwargs): +if command == 'collstats': + return {'size': 5, 'avgSize': 1} +elif command == 'splitVector': + return self.get_split_key(command, *args, **kwargs) + + def get_split_key(self, command, ns, min, max, maxChunkSize, **kwargs): Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291466) Time Spent: 9h 40m (was: 9.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 9h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291439 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 17:30 Start Date: 08/Aug/19 17:30 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312155465 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +34,138 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdHelper +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + """Fake mongodb collection cursor.""" + + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +if '$and' not in filter or not filter['$and']: + return self +start = filter['$and'][0]['_id'].get('$gte') +end = filter['$and'][0]['_id'].get('$lt') +assert start is not None +assert end is not None +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, index): +return self.docs[index] + + +class _MockMongoDb(object): + """Fake Mongo Db.""" + + def __init__(self, docs): +self.docs = docs + + def __getitem__(self, coll_name): +return _MockMongoColl(self.docs) + + def command(self, command, *args, **kwargs): +if command == 'collstats': + return {'size': 5, 'avgSize': 1} +elif command == 'splitVector': + return self.get_split_key(command, *args, **kwargs) + + def get_split_key(self, command, ns, min, max, maxChunkSize, **kwargs): +# simulate mongo db splitVector command, return split keys base on chunk +# size, assuming every doc is of size 1mb +start_id = min['_id'] +end_id = max['_id'] +if start_id >= end_id: + return [] +start_index = 0 +end_index = 0 +# get split range of [min, max] +for doc in self.docs: + if doc['_id'] < start_id: +start_index += 1 + if doc['_id'] <= end_id: +end_index += 1 + else: +break +# return ids of elements in the range with chunk size skip and exclude +# head element. +return { +'splitKeys': +[x['_id'] for x in self.docs[start_index:end_index:maxChunkSize]][1:] Review comment: Added comments to explain the assumption that every document is 1 Mb for simplicity of tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291439) Time Spent: 9.5h (was: 9h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 9.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291427=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291427 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 17:22 Start Date: 08/Aug/19 17:22 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312152162 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +197,145 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# calls mongodb splitVector command to get document ids at split position +# for desired bundle size, if desired chunk size smaller than 1mb, use +# mongodb default split size of 1mb. +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +# Merge the default filter with refined _id field range of range_tracker. +all_filters = self.filter.copy() + +# if there are no additional filters, initialize empty additional filters, +# see more at https://docs.mongodb.com/manual/reference/operator/query/and/ +if '$and' not in all_filters: + all_filters['$and'] = [] + +# add additional range filter to query. $gte specifies start position ( +# inclusive) and $lt specifies the end position (exclusive), see more at +# https://docs.mongodb.com/manual/reference/operator/query/gte/ and +# https://docs.mongodb.com/manual/reference/operator/query/lt/ +all_filters['$and'] += [{ +'_id': { +'$gte': range_tracker.start_position(), +'$lt': range_tracker.stop_position() +} +}] +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') + + def _replace_none_positions(self, start_position, stop_position): +if start_position is None: + start_position = self._get_head_document_id(ASCENDING) +if stop_position is None: + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) +return start_position, stop_position + + +class _ObjectIdHelper(object): + """A Utility class to manipulate bson object ids.""" + + @classmethod + def id_to_int(cls, id): +""" +Args: + id: ObjectId required for each MongoDB document _id field. + +Returns: Converted integer value of ObjectId's 12 bytes binary value. + +""" +# converts object id binary to integer +# id object is bytes type with size of 12 +ints = struct.unpack('>III', id.binary) +return (ints[0] << 64) + (ints[1] << 32) + ints[2] + + @classmethod + def int_to_id(cls, number): +""" +Args: + number(int): The integer value to be used to convert to ObjectId. Review comment: precommit tests covers python2.7, 3.5, 3.6, 3.7 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291427) Time Spent: 9h 20m (was: 9h 10m) > Python MongoDB IO performance and correctness issues > > >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291421=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291421 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 17:16 Start Date: 08/Aug/19 17:16 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312149592 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +197,145 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# calls mongodb splitVector command to get document ids at split position +# for desired bundle size, if desired chunk size smaller than 1mb, use +# mongodb default split size of 1mb. +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +# Merge the default filter with refined _id field range of range_tracker. +all_filters = self.filter.copy() + +# if there are no additional filters, initialize empty additional filters, +# see more at https://docs.mongodb.com/manual/reference/operator/query/and/ +if '$and' not in all_filters: Review comment: yeah, that sounds better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291421) Time Spent: 9h 10m (was: 9h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 9h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291323=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291323 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 15:13 Start Date: 08/Aug/19 15:13 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312084800 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +197,145 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# calls mongodb splitVector command to get document ids at split position +# for desired bundle size, if desired chunk size smaller than 1mb, use +# mongodb default split size of 1mb. +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +# Merge the default filter with refined _id field range of range_tracker. +all_filters = self.filter.copy() + +# if there are no additional filters, initialize empty additional filters, +# see more at https://docs.mongodb.com/manual/reference/operator/query/and/ +if '$and' not in all_filters: Review comment: What if it has one of the other query operators, such as $or $not $nor? Seems much simpler to just construct an $and of self.filter and {id: gte, lt} - or will that error out if self.filter also involves id in any capacity? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291323) Time Spent: 8h 20m (was: 8h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 8h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291326=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291326 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 15:13 Start Date: 08/Aug/19 15:13 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312090511 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -72,72 +176,47 @@ def test_split(self, mock_client): @mock.patch('apache_beam.io.mongodbio.MongoClient') def test_dynamic_work_rebalancing(self, mock_client): -splits = list(self.mongo_source.split(desired_bundle_size=3000)) -mock_client.return_value.__enter__.return_value.__getitem__.return_value \ - .__getitem__.return_value.find.return_value = [{'x': 1}, {'x': 2}, - {'x': 3}, {'x': 4}, - {'x': 5}] +mock_client.return_value = _MockMongoClient(self._docs) +splits = list( +self.mongo_source.split(desired_bundle_size=3000 * 1024 * 1024)) assert len(splits) == 1 source_test_utils.assert_split_at_fraction_exhaustive( splits[0].source, splits[0].start_position, splits[0].stop_position) - @mock.patch('apache_beam.io.mongodbio.OffsetRangeTracker') - def test_get_range_tracker(self, mock_tracker): -self.mongo_source.get_range_tracker(None, None) -mock_tracker.assert_called_with(0, 5) -self.mongo_source.get_range_tracker(10, 20) -mock_tracker.assert_called_with(10, 20) + @mock.patch('apache_beam.io.mongodbio.MongoClient') + def test_get_range_tracker(self, mock_client): +mock_client.return_value = _MockMongoClient(self._docs) +self.assertIsInstance(self.mongo_source.get_range_tracker(None, None), + _ObjectIdRangeTracker) @mock.patch('apache_beam.io.mongodbio.MongoClient') def test_read(self, mock_client): mock_tracker = mock.MagicMock() -mock_tracker.try_claim.return_value = True -mock_tracker.start_position.return_value = 0 -mock_tracker.stop_position.return_value = 2 +mock_tracker.start_position.return_value = self._ids[0] Review comment: Please also add a test that uses a tracker whose position boundaries are between object ids, and tests where 1) start_position is less than any existing id 2) end_position is larger than any existing id 3) there are no objects between start and end. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291326) Time Spent: 8h 50m (was: 8h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 8h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291320=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291320 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 15:13 Start Date: 08/Aug/19 15:13 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312087574 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +197,145 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# calls mongodb splitVector command to get document ids at split position +# for desired bundle size, if desired chunk size smaller than 1mb, use +# mongodb default split size of 1mb. +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +# Merge the default filter with refined _id field range of range_tracker. +all_filters = self.filter.copy() + +# if there are no additional filters, initialize empty additional filters, +# see more at https://docs.mongodb.com/manual/reference/operator/query/and/ +if '$and' not in all_filters: + all_filters['$and'] = [] + +# add additional range filter to query. $gte specifies start position ( +# inclusive) and $lt specifies the end position (exclusive), see more at +# https://docs.mongodb.com/manual/reference/operator/query/gte/ and +# https://docs.mongodb.com/manual/reference/operator/query/lt/ +all_filters['$and'] += [{ +'_id': { +'$gte': range_tracker.start_position(), +'$lt': range_tracker.stop_position() +} +}] +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') + + def _replace_none_positions(self, start_position, stop_position): +if start_position is None: + start_position = self._get_head_document_id(ASCENDING) +if stop_position is None: + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) +return start_position, stop_position + + +class _ObjectIdHelper(object): + """A Utility class to manipulate bson object ids.""" + + @classmethod + def id_to_int(cls, id): +""" +Args: + id: ObjectId required for each MongoDB document _id field. + +Returns: Converted integer value of ObjectId's 12 bytes binary value. + +""" +# converts object id binary to integer +# id object is bytes type with size of 12 +ints = struct.unpack('>III', id.binary) +return (ints[0] << 64) + (ints[1] << 32) + ints[2] + + @classmethod + def int_to_id(cls, number): +""" +Args: + number(int): The integer value to be used to convert to ObjectId. Review comment: Do all versions of Python that this code can run on use arbitrary-precision integers by default? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291320) Time Spent: 7h 50m (was: 7h 40m) > Python MongoDB IO performance and correctness issues >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291325=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291325 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 15:13 Start Date: 08/Aug/19 15:13 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312092244 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -210,6 +289,47 @@ def test_write_to_mongodb_with_generated_id(self, mock_client): return_value.bulk_write.assert_called_with(expected_update) +class ObjectIdHelperTest(TestCase): + def test_conversion(self): +test_cases = [ +(objectid.ObjectId(''), 0), +(objectid.ObjectId('0001'), 2**32), +(objectid.ObjectId(''), 2**32 - 1), +(objectid.ObjectId('0001'), 2**64), +(objectid.ObjectId(''), 2**64 - 1), +(objectid.ObjectId(''), 2**96 - 1), +] +for (id, number) in test_cases: + self.assertEqual(id, _ObjectIdHelper.int_to_id(number)) + self.assertEqual(number, _ObjectIdHelper.id_to_int(id)) + +# random tests +for _ in range(100): + id = objectid.ObjectId() + if sys.version_info[0] < 3: +number = int(id.binary.encode('hex'), 16) + else: # PY3 +number = int(id.binary.hex(), 16) + self.assertEqual(id, _ObjectIdHelper.int_to_id(number)) + self.assertEqual(number, _ObjectIdHelper.id_to_int(id)) + + def test_increment_id(self): +test_cases = [ +(objectid.ObjectId('0001'), + objectid.ObjectId('')), +(objectid.ObjectId('0001'), + objectid.ObjectId('')), +] +for (first, second) in test_cases: + self.assertEqual(second, _ObjectIdHelper.increment_id(first, -1)) + self.assertEqual(first, _ObjectIdHelper.increment_id(second, 1)) + +for _ in range(100): + id = objectid.ObjectId() + self.assertLess(id, _ObjectIdHelper.increment_id(id, 1)) + self.assertGreater(id, _ObjectIdHelper.increment_id(id, -1)) Review comment: Please add tests for position_to_fraction and vice versa. These are by far the easiest to get wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291325) Time Spent: 8h 40m (was: 8.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 8h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291322=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291322 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 15:13 Start Date: 08/Aug/19 15:13 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312088455 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +34,138 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdHelper +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + """Fake mongodb collection cursor.""" + + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +if '$and' not in filter or not filter['$and']: + return self +start = filter['$and'][0]['_id'].get('$gte') +end = filter['$and'][0]['_id'].get('$lt') +assert start is not None +assert end is not None +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, index): +return self.docs[index] + + +class _MockMongoDb(object): + """Fake Mongo Db.""" + + def __init__(self, docs): +self.docs = docs + + def __getitem__(self, coll_name): +return _MockMongoColl(self.docs) + + def command(self, command, *args, **kwargs): +if command == 'collstats': + return {'size': 5, 'avgSize': 1} +elif command == 'splitVector': + return self.get_split_key(command, *args, **kwargs) + + def get_split_key(self, command, ns, min, max, maxChunkSize, **kwargs): Review comment: should be get_split_keys This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291322) Time Spent: 8h 10m (was: 8h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 8h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291324=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291324 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 15:13 Start Date: 08/Aug/19 15:13 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312089236 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +34,138 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdHelper +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + """Fake mongodb collection cursor.""" + + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +if '$and' not in filter or not filter['$and']: + return self +start = filter['$and'][0]['_id'].get('$gte') +end = filter['$and'][0]['_id'].get('$lt') +assert start is not None +assert end is not None +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, index): +return self.docs[index] + + +class _MockMongoDb(object): + """Fake Mongo Db.""" + + def __init__(self, docs): +self.docs = docs + + def __getitem__(self, coll_name): +return _MockMongoColl(self.docs) + + def command(self, command, *args, **kwargs): +if command == 'collstats': + return {'size': 5, 'avgSize': 1} +elif command == 'splitVector': + return self.get_split_key(command, *args, **kwargs) + + def get_split_key(self, command, ns, min, max, maxChunkSize, **kwargs): +# simulate mongo db splitVector command, return split keys base on chunk +# size, assuming every doc is of size 1mb +start_id = min['_id'] +end_id = max['_id'] +if start_id >= end_id: + return [] +start_index = 0 +end_index = 0 +# get split range of [min, max] +for doc in self.docs: + if doc['_id'] < start_id: +start_index += 1 + if doc['_id'] <= end_id: +end_index += 1 + else: +break +# return ids of elements in the range with chunk size skip and exclude +# head element. +return { +'splitKeys': +[x['_id'] for x in self.docs[start_index:end_index:maxChunkSize]][1:] Review comment: Looks like in this method we interpret maxChunkSize as size of the chunk in terms of number of documents - but the real Mongo uses number of megabytes. Is this a problem for this test? If no, it needs to be explained. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291324) Time Spent: 8.5h (was: 8h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 8.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=291321=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-291321 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 08/Aug/19 15:13 Start Date: 08/Aug/19 15:13 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r312084018 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +197,145 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# calls mongodb splitVector command to get document ids at split position +# for desired bundle size, if desired chunk size smaller than 1mb, use +# mongodb default split size of 1mb. +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, Review comment: What does "1" mean here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 291321) Time Spent: 8h (was: 7h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 8h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290599=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290599 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 16:49 Start Date: 07/Aug/19 16:49 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311336933 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +34,136 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdHelper +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + """Fake mongodb collection cursor.""" + + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +start = filter['_id'].get('$gte') +end = filter['_id'].get('$lt') +assert start is not None +assert end is not None +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, index): +return self.docs[index] + + +class _MockMongoDb(object): + """Fake Mongo Db.""" + + def __init__(self, docs): +self.docs = docs + + def __getitem__(self, coll_name): +return _MockMongoColl(self.docs) + + def command(self, command, *args, **kwargs): +if command == 'collstats': + return {'size': 5, 'avgSize': 1} +elif command == 'splitVector': + return self.get_split_key(command, *args, **kwargs) + + def get_split_key(self, command, ns, min, max, maxChunkSize, **kwargs): +# simulate mongo db splitVector command, return split keys base on chunk +# size, assuming every doc is of size 1mb +start_id = min['_id'] Review comment: these are the argument key required by mongo client, I don't think it is easy to change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290599) Time Spent: 7h 40m (was: 7.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 7h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290598=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290598 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 16:47 Start Date: 07/Aug/19 16:47 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311337745 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' Review comment: normally means the collection is empty. guess we can just return the size. I've not seen value below zero. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290598) Time Spent: 7.5h (was: 7h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 7.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290595=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290595 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 16:44 Start Date: 07/Aug/19 16:44 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311338009 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: +break + bundle_end = min(stop_position, split_key_id) + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: Review comment: yes, also bundle_start will be set to start_position before and unlikely to be None. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290595) Time Spent: 7h 20m (was: 7h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 7h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290593=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290593 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 16:44 Start Date: 07/Aug/19 16:44 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311654458 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: +break + bundle_end = min(stop_position, split_key_id) + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) Review comment: moved to utility function This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290593) Time Spent: 7h 10m (was: 7h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 7h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290589=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290589 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 16:39 Start Date: 07/Aug/19 16:39 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311652584 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: + id_filter = all_filters['_id'] + id_filter['$gte'] = ( + max(id_filter['$gte'], range_tracker.start_position()) + if '$gte' in id_filter else range_tracker.start_position()) + + id_filter['$lt'] = (min(id_filter['$lt'], range_tracker.stop_position()) + if '$lt' in id_filter else + range_tracker.stop_position()) +else: + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), + '$lt': range_tracker.stop_position() + } + }) +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') + + +class _ObjectIdHelper(object): + """A Utility class to bson object ids.""" + + @classmethod + def id_to_int(cls, id): +# converts object id binary to integer +# id object is bytes type with size of 12 +ints = struct.unpack('>III', id.binary) +return (ints[0] << 64) + (ints[1] << 32) + ints[2] + + @classmethod + def int_to_id(cls, number): +# converts integer value to object id. Int value should be less than +# (2 ^ 96) so it can be convert to 12 bytes required by object id. +if number < 0 or number >= (1 << 96): + raise ValueError('number value must be within [0, %s)' % (1 << 96)) +ints = [(number & 0x) >> 64, +(number & 0x) >> 32, +number & 0x] + +bytes = struct.pack('>III', *ints) +return objectid.ObjectId(bytes) + + @classmethod + def increment_id(cls, object_id, inc): +# increment object_id binary value by inc value and return new object id. +id_number = _ObjectIdHelper.id_to_int(object_id) +new_number = id_number + inc +if new_number < 0 or new_number >= (1 << 96): + raise ValueError('invalid incremental, inc value must be within [' Review comment: This shows the acceptable inc argument range clearer but I can also remove this check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290589) Time Spent: 7h (was: 6h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290587=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290587 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 16:33 Start Date: 07/Aug/19 16:33 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311649962 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: + id_filter = all_filters['_id'] + id_filter['$gte'] = ( + max(id_filter['$gte'], range_tracker.start_position()) + if '$gte' in id_filter else range_tracker.start_position()) + + id_filter['$lt'] = (min(id_filter['$lt'], range_tracker.stop_position()) + if '$lt' in id_filter else + range_tracker.stop_position()) +else: + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), + '$lt': range_tracker.stop_position() + } + }) +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') Review comment: it's actually required primary key for every document. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290587) Time Spent: 6h 50m (was: 6h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 6h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290143=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290143 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 01:37 Start Date: 07/Aug/19 01:37 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311338447 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() Review comment: it will be set to {} by constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290143) Time Spent: 6h 40m (was: 6.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 6h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290141=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290141 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 01:35 Start Date: 07/Aug/19 01:35 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311338085 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: +break + bundle_end = min(stop_position, split_key_id) + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) Review comment: is there a situation where one of the position is none and the other isn't? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290141) Time Spent: 6.5h (was: 6h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 6.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290140=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290140 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 01:35 Start Date: 07/Aug/19 01:35 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311338009 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: +break + bundle_end = min(stop_position, split_key_id) + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: Review comment: yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290140) Time Spent: 6h 20m (was: 6h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 6h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290138=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290138 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 01:33 Start Date: 07/Aug/19 01:33 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311337745 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' Review comment: normally means the collection is invalid or empty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290138) Time Spent: 6h 10m (was: 6h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 6h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290136=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290136 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 01:28 Start Date: 07/Aug/19 01:28 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311336933 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +34,136 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdHelper +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + """Fake mongodb collection cursor.""" + + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +start = filter['_id'].get('$gte') +end = filter['_id'].get('$lt') +assert start is not None +assert end is not None +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, index): +return self.docs[index] + + +class _MockMongoDb(object): + """Fake Mongo Db.""" + + def __init__(self, docs): +self.docs = docs + + def __getitem__(self, coll_name): +return _MockMongoColl(self.docs) + + def command(self, command, *args, **kwargs): +if command == 'collstats': + return {'size': 5, 'avgSize': 1} +elif command == 'splitVector': + return self.get_split_key(command, *args, **kwargs) + + def get_split_key(self, command, ns, min, max, maxChunkSize, **kwargs): +# simulate mongo db splitVector command, return split keys base on chunk +# size, assuming every doc is of size 1mb +start_id = min['_id'] Review comment: these are the argument key required by mongo client This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290136) Time Spent: 6h (was: 5h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 6h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290133=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290133 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 01:25 Start Date: 07/Aug/19 01:25 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311336422 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: Review comment: thought I fixed this earlier, weird. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290133) Time Spent: 5h 40m (was: 5.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 5h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template).
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290134=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290134 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 01:25 Start Date: 07/Aug/19 01:25 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311336434 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: +break + bundle_end = min(stop_position, split_key_id) + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count -return OffsetRangeTracker(start_position, stop_position) + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) +return _ObjectIdRangeTracker(start_position, stop_position) Review comment: yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290134) Time Spent: 5h 50m (was: 5h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 5h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290110=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290110 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311324039 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: + id_filter = all_filters['_id'] + id_filter['$gte'] = ( + max(id_filter['$gte'], range_tracker.start_position()) + if '$gte' in id_filter else range_tracker.start_position()) + + id_filter['$lt'] = (min(id_filter['$lt'], range_tracker.stop_position()) + if '$lt' in id_filter else + range_tracker.stop_position()) +else: + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), + '$lt': range_tracker.stop_position() + } + }) +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') Review comment: Or it just didn't contain key '_id' for some reason ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290110) Time Spent: 3h 50m (was: 3h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290107=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290107 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311311504 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' Review comment: Is this an invalid state or it is just that the size cannot be determined ? If latter, you can just return None: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L134 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290107) Time Spent: 3h 20m (was: 3h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 3h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290121=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290121 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311325179 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: + id_filter = all_filters['_id'] + id_filter['$gte'] = ( + max(id_filter['$gte'], range_tracker.start_position()) + if '$gte' in id_filter else range_tracker.start_position()) + + id_filter['$lt'] = (min(id_filter['$lt'], range_tracker.stop_position()) + if '$lt' in id_filter else + range_tracker.stop_position()) +else: + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), + '$lt': range_tracker.stop_position() + } + }) +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') + + +class _ObjectIdHelper(object): + """A Utility class to bson object ids.""" + + @classmethod + def id_to_int(cls, id): +# converts object id binary to integer +# id object is bytes type with size of 12 +ints = struct.unpack('>III', id.binary) +return (ints[0] << 64) + (ints[1] << 32) + ints[2] + + @classmethod + def int_to_id(cls, number): +# converts integer value to object id. Int value should be less than +# (2 ^ 96) so it can be convert to 12 bytes required by object id. +if number < 0 or number >= (1 << 96): + raise ValueError('number value must be within [0, %s)' % (1 << 96)) +ints = [(number & 0x) >> 64, +(number & 0x) >> 32, +number & 0x] + +bytes = struct.pack('>III', *ints) +return objectid.ObjectId(bytes) + + @classmethod + def increment_id(cls, object_id, inc): +# increment object_id binary value by inc value and return new object id. +id_number = _ObjectIdHelper.id_to_int(object_id) +new_number = id_number + inc +if new_number < 0 or new_number >= (1 << 96): + raise ValueError('invalid incremental, inc value must be within [' Review comment: Seems like we already do this validation inside 'int_to_id' function invoked below ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290121) Time Spent: 5.5h (was: 5h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290111=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290111 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311323214 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: Review comment: What does key '_id' contain ? Please add a comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290111) Time Spent: 4h (was: 3h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 4h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290115=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290115 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311320582 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: +break + bundle_end = min(stop_position, split_key_id) + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) Review comment: We had the same logic above. Can we move start and end position computation logic to a util function ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290115) Time Spent: 4.5h (was: 4h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 4.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290114=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290114 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311321842 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): Review comment: Can you add a comment on what this function does (prob. we need more comments in general clarifying non-obvious parts of the code). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290114) Time Spent: 4h 20m (was: 4h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290109=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290109 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311324493 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: + id_filter = all_filters['_id'] + id_filter['$gte'] = ( + max(id_filter['$gte'], range_tracker.start_position()) + if '$gte' in id_filter else range_tracker.start_position()) + + id_filter['$lt'] = (min(id_filter['$lt'], range_tracker.stop_position()) + if '$lt' in id_filter else + range_tracker.stop_position()) +else: + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), + '$lt': range_tracker.stop_position() + } + }) +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') + + +class _ObjectIdHelper(object): + """A Utility class to bson object ids.""" + + @classmethod + def id_to_int(cls, id): +# converts object id binary to integer Review comment: Please add a proper doc comment including variables if possible. (even though containing class is private, better to properly describe these functions that contain non-trivial byte manipulations). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290109) Time Spent: 3h 40m (was: 3.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290117=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290117 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311329296 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: +break + bundle_end = min(stop_position, split_key_id) + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count -return OffsetRangeTracker(start_position, stop_position) + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) +return _ObjectIdRangeTracker(start_position, stop_position) Review comment: Are object IDs are directly comparable (as required by OrderedPositionRangeTracker) ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290117) Time Spent: 4h 50m (was: 4h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 4h 50m > Remaining Estimate: 0h > >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290108=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290108 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311323133 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() Review comment: Looks like self.filter can be None (default) ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290108) Time Spent: 3.5h (was: 3h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 3.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290118=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290118 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311324139 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: + id_filter = all_filters['_id'] + id_filter['$gte'] = ( + max(id_filter['$gte'], range_tracker.start_position()) + if '$gte' in id_filter else range_tracker.start_position()) + + id_filter['$lt'] = (min(id_filter['$lt'], range_tracker.stop_position()) + if '$lt' in id_filter else + range_tracker.stop_position()) +else: + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), + '$lt': range_tracker.stop_position() + } + }) +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') + + +class _ObjectIdHelper(object): + """A Utility class to bson object ids.""" Review comment: "...to manipulate bson object ids." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290118) Time Spent: 5h (was: 4h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290116=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290116 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311323636 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: + id_filter = all_filters['_id'] + id_filter['$gte'] = ( Review comment: What does key '$gte' contain ? Please add a comment. (same for other special keys used here). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290116) Time Spent: 4h 40m (was: 4.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 4h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290112=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290112 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311320245 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: +break + bundle_end = min(stop_position, split_key_id) + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: Review comment: Does this work if bundle_start == None ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290112) Time Spent: 4h (was: 3h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 4h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290113=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290113 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311319659 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,64 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + start_position = self._get_head_document_id(ASCENDING) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_head_document_id(DESCENDING) + # increment last doc id binary value by 1 to make sure the last document + # is not excluded + stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + if bundle_start is not None or bundle_start >= stop_position: Review comment: Did you mean "if bundle_start is None or" ? (seems like this loop will just end after first iteration). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290113) Time Spent: 4h 10m (was: 4h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290120=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290120 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311330436 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +34,136 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdHelper +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + """Fake mongodb collection cursor.""" + + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +start = filter['_id'].get('$gte') +end = filter['_id'].get('$lt') +assert start is not None +assert end is not None +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, index): +return self.docs[index] + + +class _MockMongoDb(object): + """Fake Mongo Db.""" + + def __init__(self, docs): +self.docs = docs + + def __getitem__(self, coll_name): +return _MockMongoColl(self.docs) + + def command(self, command, *args, **kwargs): +if command == 'collstats': + return {'size': 5, 'avgSize': 1} +elif command == 'splitVector': + return self.get_split_key(command, *args, **kwargs) + + def get_split_key(self, command, ns, min, max, maxChunkSize, **kwargs): +# simulate mongo db splitVector command, return split keys base on chunk +# size, assuming every doc is of size 1mb +start_id = min['_id'] Review comment: Prob. use different variable names since these override system functions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290120) Time Spent: 5h 20m (was: 5h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 5h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=290119=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290119 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 07/Aug/19 00:53 Start Date: 07/Aug/19 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r311324914 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +212,110 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size_in_mb < 1: + desired_chunk_size_in_mb = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + + def _merge_id_filter(self, range_tracker): +all_filters = self.filter.copy() +if '_id' in all_filters: + id_filter = all_filters['_id'] + id_filter['$gte'] = ( + max(id_filter['$gte'], range_tracker.start_position()) + if '$gte' in id_filter else range_tracker.start_position()) + + id_filter['$lt'] = (min(id_filter['$lt'], range_tracker.stop_position()) + if '$lt' in id_filter else + range_tracker.stop_position()) +else: + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), + '$lt': range_tracker.stop_position() + } + }) +return all_filters + + def _get_head_document_id(self, sort_order): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', sort_order) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') + + +class _ObjectIdHelper(object): + """A Utility class to bson object ids.""" + + @classmethod + def id_to_int(cls, id): +# converts object id binary to integer +# id object is bytes type with size of 12 +ints = struct.unpack('>III', id.binary) +return (ints[0] << 64) + (ints[1] << 32) + ints[2] + + @classmethod + def int_to_id(cls, number): +# converts integer value to object id. Int value should be less than +# (2 ^ 96) so it can be convert to 12 bytes required by object id. +if number < 0 or number >= (1 << 96): + raise ValueError('number value must be within [0, %s)' % (1 << 96)) +ints = [(number & 0x) >> 64, Review comment: Please make sure that these functions are extensively tested using unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 290119) Time Spent: 5h 10m (was: 5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 5h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288800=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288800 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 05/Aug/19 09:02 Start Date: 05/Aug/19 09:02 Worklog Time Spent: 10m Work Description: jkff commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-518149966 Thank you for the updates! I'd like to follow my usual review strategy with IO changes and hand off the next review round to @chamikaramj , I'll do the round after that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288800) Time Spent: 3h 10m (was: 3h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 3h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288683=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288683 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 05/Aug/19 05:37 Start Date: 05/Aug/19 05:37 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310443556 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +32,102 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +start = filter['_id'].get('$gte') +end = filter['_id'].get('$lt') +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, item): +return self.docs[item] + + class MongoSourceTest(unittest.TestCase): - @mock.patch('apache_beam.io.mongodbio._BoundedMongoSource' - '._get_document_count') - @mock.patch('apache_beam.io.mongodbio._BoundedMongoSource' - '._get_avg_document_size') - def setUp(self, mock_size, mock_count): -mock_size.return_value = 10 -mock_count.return_value = 5 + @mock.patch('apache_beam.io.mongodbio.MongoClient') + def setUp(self, mock_client): +mock_client.return_value.__enter__.return_value.__getitem__ \ + .return_value.command.return_value = {'size': 5, 'avgSize': 1} +self._ids = [ +objectid.ObjectId.from_datetime( +datetime.datetime(year=2020, month=i + 1, day=i + 1)) +for i in range(5) +] +self._docs = [{'_id': self._ids[i], 'x': i} for i in range(len(self._ids))] + self.mongo_source = _BoundedMongoSource('mongodb://test', 'testdb', 'testcoll') - def test_estimate_size(self): -self.assertEqual(self.mongo_source.estimate_size(), 50) + def get_split(self, command, ns, min, max, maxChunkSize, **kwargs): Review comment: maxChunkSize is the actual argument name mango client needed(maybe by mistake) so I'm not able to change it to snake case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288683) Time Spent: 2h 50m (was: 2h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288684=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288684 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 05/Aug/19 05:37 Start Date: 05/Aug/19 05:37 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310443577 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +32,102 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +start = filter['_id'].get('$gte') +end = filter['_id'].get('$lt') +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, item): +return self.docs[item] + + class MongoSourceTest(unittest.TestCase): - @mock.patch('apache_beam.io.mongodbio._BoundedMongoSource' - '._get_document_count') - @mock.patch('apache_beam.io.mongodbio._BoundedMongoSource' - '._get_avg_document_size') - def setUp(self, mock_size, mock_count): -mock_size.return_value = 10 -mock_count.return_value = 5 + @mock.patch('apache_beam.io.mongodbio.MongoClient') + def setUp(self, mock_client): +mock_client.return_value.__enter__.return_value.__getitem__ \ Review comment: make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288684) Time Spent: 3h (was: 2h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 3h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288682=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288682 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 05/Aug/19 05:36 Start Date: 05/Aug/19 05:36 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310443397 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +225,72 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size < 1: Review comment: yes in Mb. renamed the argument for clarity. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288682) Time Spent: 2h 40m (was: 2.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288681=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288681 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 05/Aug/19 05:35 Start Date: 05/Aug/19 05:35 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310443330 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,77 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) + start_position = objectid.ObjectId.from_datetime(epoch) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_last_document_id() + # add one sec to make sure the last document is not excluded + last_timestamp_plus_one_sec = (last_doc_id.generation_time + + datetime.timedelta(seconds=1)) + stop_position = objectid.ObjectId.from_datetime( + last_timestamp_plus_one_sec) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + bundle_end = min(stop_position, split_key_id) + if bundle_start is None and bundle_start < stop_position: +return + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) + start_position = objectid.ObjectId.from_datetime(epoch) if stop_position is None: - stop_position = self.doc_count -return OffsetRangeTracker(start_position, stop_position) + last_doc_id = self._get_last_document_id() + # add one sec to make sure the last document is not excluded + last_timestamp_plus_one_sec = (last_doc_id.generation_time + + datetime.timedelta(seconds=1)) + stop_position = objectid.ObjectId.from_datetime( + last_timestamp_plus_one_sec) +return _ObjectIdRangeTracker(start_position, stop_position) def read(self, range_tracker): with MongoClient(self.uri, **self.spec) as client: - # docs is a MongoDB Cursor - docs = client[self.db][self.coll].find( - filter=self.filter, projection=self.projection - )[range_tracker.start_position():range_tracker.stop_position()] - for index in range(range_tracker.start_position(), - range_tracker.stop_position()): -if not range_tracker.try_claim(index): + all_filters = self.filter + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), Review comment: Added the filter merge logic. This is an automated message from the Apache Git Service. To respond to the message,
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288246=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288246 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:56 Start Date: 02/Aug/19 20:56 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310295208 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,77 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) + start_position = objectid.ObjectId.from_datetime(epoch) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_last_document_id() + # add one sec to make sure the last document is not excluded + last_timestamp_plus_one_sec = (last_doc_id.generation_time + + datetime.timedelta(seconds=1)) + stop_position = objectid.ObjectId.from_datetime( + last_timestamp_plus_one_sec) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + bundle_end = min(stop_position, split_key_id) + if bundle_start is None and bundle_start < stop_position: +return + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) Review comment: My understanding is when the first split happens. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288246) Time Spent: 2h 20m (was: 2h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288245=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288245 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:54 Start Date: 02/Aug/19 20:54 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310294780 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,77 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) Review comment: makes sense, thanks for the suggest. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288245) Time Spent: 2h 10m (was: 2h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288222=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288222 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:12 Start Date: 02/Aug/19 20:12 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310281527 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +32,102 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +start = filter['_id'].get('$gte') +end = filter['_id'].get('$lt') +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, item): +return self.docs[item] + + class MongoSourceTest(unittest.TestCase): - @mock.patch('apache_beam.io.mongodbio._BoundedMongoSource' - '._get_document_count') - @mock.patch('apache_beam.io.mongodbio._BoundedMongoSource' - '._get_avg_document_size') - def setUp(self, mock_size, mock_count): -mock_size.return_value = 10 -mock_count.return_value = 5 + @mock.patch('apache_beam.io.mongodbio.MongoClient') + def setUp(self, mock_client): +mock_client.return_value.__enter__.return_value.__getitem__ \ + .return_value.command.return_value = {'size': 5, 'avgSize': 1} +self._ids = [ +objectid.ObjectId.from_datetime( +datetime.datetime(year=2020, month=i + 1, day=i + 1)) +for i in range(5) +] +self._docs = [{'_id': self._ids[i], 'x': i} for i in range(len(self._ids))] + self.mongo_source = _BoundedMongoSource('mongodb://test', 'testdb', 'testcoll') - def test_estimate_size(self): -self.assertEqual(self.mongo_source.estimate_size(), 50) + def get_split(self, command, ns, min, max, maxChunkSize, **kwargs): Review comment: * What does this do? It's not quite clear from the method body * maxChunkSize should probably be called max_chunk_size This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288222) Time Spent: 1h 50m (was: 1h 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288221=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288221 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:12 Start Date: 02/Aug/19 20:12 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310280167 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +225,72 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size < 1: + desired_chunk_size = 1 +if start_pos >= end_pos: + # single document not splittable + return [] with MongoClient(self.uri, **self.spec) as client: - size = client[self.db].command('collstats', self.coll).get('avgObjSize') - if size is None or size <= 0: -raise ValueError( -'Collection %s not found or average doc size is ' -'incorrect', self.coll) - return size - - def _get_document_count(self): + name_space = '%s.%s' % (self.db, self.coll) + return (client[self.db].command( + 'splitVector', + name_space, + keyPattern={'_id': 1}, + min={'_id': start_pos}, + max={'_id': end_pos}, + maxChunkSize=desired_chunk_size)['splitKeys']) + + def _get_last_document_id(self): with MongoClient(self.uri, **self.spec) as client: - return max(client[self.db][self.coll].count_documents(self.filter), 0) + cursor = client[self.db][self.coll].find(filter={}, projection=[]).sort([ + ('_id', DESCENDING) + ]).limit(1) + try: +return cursor[0]['_id'] + except IndexError: +raise ValueError('Empty Mongodb collection') + + +class _ObjectIdRangeTracker(OrderedPositionRangeTracker): + """RangeTracker for tracking mongodb _id of bson ObjectId type.""" + + def _id_to_int(self, id): +# id object is bytes type with size of 12 +ints = struct.unpack('>III', id.binary) +return 2**64 * ints[0] + 2**32 * ints[1] + ints[2] + + def _int_to_id(self, numbers): Review comment: This kind of bignum arithmetic is extremely hard to get right, please make sure the unit tests cover enough corner cases. I would recommend to add some randomized property testing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288221) Time Spent: 1h 40m (was: 1.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288218=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288218 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:12 Start Date: 02/Aug/19 20:12 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310278529 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,77 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) + start_position = objectid.ObjectId.from_datetime(epoch) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_last_document_id() + # add one sec to make sure the last document is not excluded + last_timestamp_plus_one_sec = (last_doc_id.generation_time + + datetime.timedelta(seconds=1)) + stop_position = objectid.ObjectId.from_datetime( + last_timestamp_plus_one_sec) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + bundle_end = min(stop_position, split_key_id) + if bundle_start is None and bundle_start < stop_position: +return + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) Review comment: When can start_position and stop_position be None in get_range_tracker? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288218) Time Spent: 1.5h (was: 1h 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily >
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288223=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288223 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:12 Start Date: 02/Aug/19 20:12 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310281025 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +32,102 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +start = filter['_id'].get('$gte') +end = filter['_id'].get('$lt') +for doc in self.docs: + if start and doc['_id'] < start: +continue + if end and doc['_id'] >= end: +continue + match.append(doc) +return match + + def find(self, filter=None, **kwargs): +return _MockMongoColl(self._filter(filter)) + + def sort(self, sort_items): +key, order = sort_items[0] +self.docs = sorted(self.docs, + key=lambda x: x[key], + reverse=(order != ASCENDING)) +return self + + def limit(self, num): +return _MockMongoColl(self.docs[0:num]) + + def count_documents(self, filter): +return len(self._filter(filter)) + + def __getitem__(self, item): +return self.docs[item] + + class MongoSourceTest(unittest.TestCase): - @mock.patch('apache_beam.io.mongodbio._BoundedMongoSource' - '._get_document_count') - @mock.patch('apache_beam.io.mongodbio._BoundedMongoSource' - '._get_avg_document_size') - def setUp(self, mock_size, mock_count): -mock_size.return_value = 10 -mock_count.return_value = 5 + @mock.patch('apache_beam.io.mongodbio.MongoClient') + def setUp(self, mock_client): +mock_client.return_value.__enter__.return_value.__getitem__ \ Review comment: It's hard to follow what this line is doing, could you add a comment? Ditto for other similar statements. I'm wondering if mocking is even the right approach here. Maybe create a fake instead? Seems like that could be much more readable and less fragile w.r.t. precise order of method calls. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288223) Time Spent: 2h (was: 1h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 2h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288217=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288217 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:12 Start Date: 02/Aug/19 20:12 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310278966 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,77 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) + start_position = objectid.ObjectId.from_datetime(epoch) if stop_position is None: - stop_position = self.doc_count + last_doc_id = self._get_last_document_id() + # add one sec to make sure the last document is not excluded + last_timestamp_plus_one_sec = (last_doc_id.generation_time + + datetime.timedelta(seconds=1)) + stop_position = objectid.ObjectId.from_datetime( + last_timestamp_plus_one_sec) -# get an estimate on how many documents should be included in a split batch -desired_bundle_count = desired_bundle_size // self.avg_doc_size +desired_bundle_size_in_mb = desired_bundle_size // 1024 // 1024 +split_keys = self._get_split_keys(desired_bundle_size_in_mb, start_position, + stop_position) bundle_start = start_position -while bundle_start < stop_position: - bundle_end = min(stop_position, bundle_start + desired_bundle_count) - yield iobase.SourceBundle(weight=bundle_end - bundle_start, +for split_key_id in split_keys: + bundle_end = min(stop_position, split_key_id) + if bundle_start is None and bundle_start < stop_position: +return + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, source=self, start_position=bundle_start, stop_position=bundle_end) bundle_start = bundle_end +# add range of last split_key to stop_position +if bundle_start < stop_position: + yield iobase.SourceBundle(weight=desired_bundle_size_in_mb, +source=self, +start_position=bundle_start, +stop_position=stop_position) def get_range_tracker(self, start_position, stop_position): if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) + start_position = objectid.ObjectId.from_datetime(epoch) if stop_position is None: - stop_position = self.doc_count -return OffsetRangeTracker(start_position, stop_position) + last_doc_id = self._get_last_document_id() + # add one sec to make sure the last document is not excluded + last_timestamp_plus_one_sec = (last_doc_id.generation_time + + datetime.timedelta(seconds=1)) + stop_position = objectid.ObjectId.from_datetime( + last_timestamp_plus_one_sec) +return _ObjectIdRangeTracker(start_position, stop_position) def read(self, range_tracker): with MongoClient(self.uri, **self.spec) as client: - # docs is a MongoDB Cursor - docs = client[self.db][self.coll].find( - filter=self.filter, projection=self.projection - )[range_tracker.start_position():range_tracker.stop_position()] - for index in range(range_tracker.start_position(), - range_tracker.stop_position()): -if not range_tracker.try_claim(index): + all_filters = self.filter + all_filters.update({ + '_id': { + '$gte': range_tracker.start_position(), Review comment: What if the original filter already has gte/lt conditions on _id? I think it's fine to prohibit it (at construction time); but you can also take it into account and infer the
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288224=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288224 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:12 Start Date: 02/Aug/19 20:12 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310279232 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -194,18 +225,72 @@ def display_data(self): res['mongo_client_spec'] = self.spec return res - def _get_avg_document_size(self): + def _get_split_keys(self, desired_chunk_size, start_pos, end_pos): +# if desired chunk size smaller than 1mb, use mongodb default split size of +# 1mb +if desired_chunk_size < 1: Review comment: What are the units of desired_chunk_size? The comment implies mb? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288224) Time Spent: 2h (was: 1h 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 2h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288219=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288219 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:12 Start Date: 02/Aug/19 20:12 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310280815 ## File path: sdks/python/apache_beam/io/mongodbio_test.py ## @@ -30,38 +32,102 @@ from apache_beam.io.mongodbio import _BoundedMongoSource from apache_beam.io.mongodbio import _GenerateObjectIdFn from apache_beam.io.mongodbio import _MongoSink +from apache_beam.io.mongodbio import _ObjectIdRangeTracker from apache_beam.io.mongodbio import _WriteMongoFn from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +class _MockMongoColl(object): + def __init__(self, docs): +self.docs = docs + + def _filter(self, filter): +match = [] +if not filter: + return self +start = filter['_id'].get('$gte') Review comment: Please assert that start, end are both not None This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288219) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288220=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288220 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 20:12 Start Date: 02/Aug/19 20:12 Worklog Time Spent: 10m Work Description: jkff commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#discussion_r310264499 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -139,50 +143,77 @@ def __init__(self, self.filter = filter self.projection = projection self.spec = extra_client_params -self.doc_count = self._get_document_count() -self.avg_doc_size = self._get_avg_document_size() -self.client = None def estimate_size(self): -return self.avg_doc_size * self.doc_count +with MongoClient(self.uri, **self.spec) as client: + size = client[self.db].command('collstats', self.coll).get('size') + if size is None or size <= 0: +raise ValueError('Collection %s not found or total doc size is ' + 'incorrect' % self.coll) + return size def split(self, desired_bundle_size, start_position=None, stop_position=None): # use document cursor index as the start and stop positions if start_position is None: - start_position = 0 + epoch = datetime.datetime(1970, 1, 1) Review comment: * Please add a comment that this is an object id smaller than any possible actual object id * Does Mongo actually guarantee this property? Maybe it makes sense to explicitly query for the smallest and largest object id in the database, if it can be done quickly? * Using start and end position that are far removed from the actual ids of present objects risks having most of the splits be empty, or at least having a couple of splits at the edges that are difficult to liquid-shard. This is a known issue with e.g. bigtable and shuffle sources in Dataflow. In this sense too, querying for smallest and largest id would be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288220) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. --
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288133=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288133 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 18:03 Start Date: 02/Aug/19 18:03 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-517794332 R: @chamikaramj @jkff This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288133) Time Spent: 1h 20m (was: 1h 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288081=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288081 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 17:05 Start Date: 02/Aug/19 17:05 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-517776103 Run Python MongoDBIO_IT This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288081) Time Spent: 1h 10m (was: 1h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288074=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288074 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 16:57 Start Date: 02/Aug/19 16:57 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-517773354 Run Python MongoDBIO Integration Test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288074) Time Spent: 1h (was: 50m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288073=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288073 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 16:57 Start Date: 02/Aug/19 16:57 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-517773639 Run Python MongoDBIO_IT This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288073) Time Spent: 50m (was: 40m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 50m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288070=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288070 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 16:56 Start Date: 02/Aug/19 16:56 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-517773354 Run Python MongoDBIO Integration Test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288070) Time Spent: 40m (was: 0.5h) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 40m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288069=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288069 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 16:55 Start Date: 02/Aug/19 16:55 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-517772935 Python MongoDBIO Integration Test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288069) Time Spent: 0.5h (was: 20m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288068=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288068 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 16:55 Start Date: 02/Aug/19 16:55 Worklog Time Spent: 10m Work Description: y1chi commented on issue #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233#issuecomment-517772935 Python MongoDBIO Integration Test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 288068) Time Spent: 20m (was: 10m) > Python MongoDB IO performance and correctness issues > > > Key: BEAM-7866 > URL: https://issues.apache.org/jira/browse/BEAM-7866 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Eugene Kirpichov >Assignee: Yichi Zhang >Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 20m > Remaining Estimate: 0h > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py > splits the query result by computing number of results in constructor, and > then in each reader re-executing the whole query and getting an index > sub-range of those results. > This is broken in several critical ways: > - The order of query results returned by find() is not necessarily > deterministic, so the idea of index ranges on it is meaningless: each shard > may basically get random, possibly overlapping subsets of the total results > - Even if you add order by `_id`, the database may be changing concurrently > to reading and splitting. E.g. if the database contained documents with ids > 10 20 30 40 50, and this was split into shards 0..2 and 3..5 (under the > assumption that these shards would contain respectively 10 20 30, and 40 50), > and then suppose shard 10 20 30 is read and then document 25 is inserted - > then the 3..5 shard will read 30 40 50, i.e. document 30 is duplicated and > document 25 is lost. > - Every shard re-executes the query and skips the first start_offset items, > which in total is quadratic complexity > - The query is first executed in the constructor in order to count results, > which 1) means the constructor can be super slow and 2) it won't work at all > if the database is unavailable at the time the pipeline is constructed (e.g. > if this is a template). > Unfortunately, none of these issues are caught by SourceTestUtils: this class > has extensive coverage with it, and the tests pass. This is because the tests > return the same results in the same order. I don't know how to catch this > automatically, and I don't know how to catch the performance issue > automatically, but these would all be important follow-up items after the > actual fix. > CC: [~chamikara] as reviewer. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-7866) Python MongoDB IO performance and correctness issues
[ https://issues.apache.org/jira/browse/BEAM-7866?focusedWorklogId=288048=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288048 ] ASF GitHub Bot logged work on BEAM-7866: Author: ASF GitHub Bot Created on: 02/Aug/19 16:31 Start Date: 02/Aug/19 16:31 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #9233: [BEAM-7866] Fix python ReadFromMongoDB potential data loss issue URL: https://github.com/apache/beam/pull/9233 Change the liquid sharding split strategy to use splitVector command in mongodb to make sure the split range is deterministic and non-overlapping. Remove all IO operations in initializing the class to avoid slow startup or unavailability issues. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build