[GitHub] [beam] y1chi commented on a change in pull request #13350: [BEAM-11266] Python IO MongoDB: add bucket_auto aggregation option for bundling in Atlas.
y1chi commented on a change in pull request #13350: URL: https://github.com/apache/beam/pull/13350#discussion_r524486089 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -241,6 +275,27 @@ def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): max={'_id': end_pos}, maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + def _get_buckets(self, desired_chunk_size, start_pos, end_pos): +if start_pos >= end_pos: + # single document not splittable + return [] +size = self.estimate_size() +bucket_count = size // desired_chunk_size Review comment: The split function will likely be called recursively for dynamic rebalancing, so for a range with start_pos and end_pos, it can be further split upon backend request, so it might not be reasonable to always use the total collection size divided by desired_chunk_size to calculate the bucket count. Is it possible to only get the buckets within the give _id range? and we can probably use an average document size times the number of documents to calculate the size of the range being split. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #13350: [BEAM-11266] Python IO MongoDB: add bucket_auto aggregation option for bundling in Atlas.
y1chi commented on a change in pull request #13350: URL: https://github.com/apache/beam/pull/13350#discussion_r524486089 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -241,6 +275,27 @@ def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): max={'_id': end_pos}, maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + def _get_buckets(self, desired_chunk_size, start_pos, end_pos): +if start_pos >= end_pos: + # single document not splittable + return [] +size = self.estimate_size() +bucket_count = size // desired_chunk_size Review comment: The split function will likely be called recursively for dynamic rebalancing, so for a range with start_pos and end_pos, it can be further split upon backend request, so it might not be reasonable to always use the total collection size divide by desired_chunk_size to calculate the bucket count. Is it possible to only get the buckets within the give _id range? and we can probably use an average document size times the number of documents to calculate the size of the range being split. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #13350: [BEAM-11266] Python IO MongoDB: add bucket_auto aggregation option for bundling in Atlas.
y1chi commented on a change in pull request #13350: URL: https://github.com/apache/beam/pull/13350#discussion_r524486089 ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -241,6 +275,27 @@ def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): max={'_id': end_pos}, maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + def _get_buckets(self, desired_chunk_size, start_pos, end_pos): +if start_pos >= end_pos: + # single document not splittable + return [] +size = self.estimate_size() +bucket_count = size // desired_chunk_size Review comment: The split function will likely be called recursively for dynamic rebalancing, so for a range with start_pos and end_pos, it can be further split upon backend request, so it might not reasonable to always use the total collection size divide by desired_chunk_size to calculate the bucket count. Is it possible to only get the buckets within the give _id range? and we can probably use an average document size times the number of documents to calculate the size of the range being split. ## File path: sdks/python/apache_beam/io/mongodbio.py ## @@ -241,6 +275,27 @@ def _get_split_keys(self, desired_chunk_size_in_mb, start_pos, end_pos): max={'_id': end_pos}, maxChunkSize=desired_chunk_size_in_mb)['splitKeys']) + def _get_buckets(self, desired_chunk_size, start_pos, end_pos): +if start_pos >= end_pos: + # single document not splittable + return [] +size = self.estimate_size() +bucket_count = size // desired_chunk_size +if size % desired_chunk_size != 0: + bucket_count += 1 +with beam.io.mongodbio.MongoClient(self.uri, **self.spec) as client: + buckets = list( Review comment: the return buckets should guarantee the _id range is start_pos and end_pos otherwise same document could be read multiple times. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org