[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=219496=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-219496 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 27/Mar/19 17:46 Start Date: 27/Mar/19 17:46 Worklog Time Spent: 10m Work Description: apilloud commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-477276227 ping @iemejia This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 219496) Time Spent: 7h (was: 6h 50m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix, triaged > Fix For: 2.12.0 > > Time Spent: 7h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=210620=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-210620 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 10/Mar/19 01:39 Start Date: 10/Mar/19 01:39 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-471238356 @iemejia you mind taking a final look. I've updated the other PR and merged it to this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 210620) Time Spent: 6h 50m (was: 6h 40m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix, triaged > Time Spent: 6h 50m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=205583=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-205583 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 28/Feb/19 02:30 Start Date: 28/Feb/19 02:30 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-468112017 Just for info @kennknowles this is not stalled (or forgotten), the discussion moved into a different PR (in another repo) and should be moved back to this one soon. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 205583) Time Spent: 6.5h (was: 6h 20m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix, triaged > Time Spent: 6.5h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=196319=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196319 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 08/Feb/19 17:15 Start Date: 08/Feb/19 17:15 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-461876884 Hello @sandboxws I haven't forgotten about this one, I was just cleaning my review queue so noe you are at review time, expect comments quickly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196319) Time Spent: 6h 20m (was: 6h 10m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix, triaged > Time Spent: 6h 20m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=195209=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195209 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 06/Feb/19 16:15 Start Date: 06/Feb/19 16:15 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-461083571 @iemejia a friendly reminder. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 195209) Time Spent: 6h 10m (was: 6h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 6h 10m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193626=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193626 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 02/Feb/19 06:33 Start Date: 02/Feb/19 06:33 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459940313 @iemejia I think I nailed it. https://github.com/sandboxws/beam/pull/1 I've added the following: ``` - Support to use splitVector with AggregationQuery, i.e. MongoDB Aggregations - Support to use $bucketAuto in case splitVector is not supported (Fixes https://issues.apache.org/jira/browse/BEAM-4567) ``` You can read more about `$bucketAuto` here: https://docs.mongodb.com/manual/reference/operator/aggregation/bucketAuto/ If we agree on this approach, I can cherry pick you commit, then apply the changes from above tmp PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193626) Time Spent: 6h (was: 5h 50m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 6h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193625=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193625 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 02/Feb/19 06:31 Start Date: 02/Feb/19 06:31 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459940313 @iemejia I think I nailed it. https://github.com/sandboxws/beam/pull/1 I've added the following: ``` - Support to use splitVector with AggregationQuery, i.e. MongoDB Aggregations - Support to use $bucketAuto in case splitVector is not supported (Fixes https://issues.apache.org/jira/browse/BEAM-4567) ``` You can read more about `$bucketAuto` here https://docs.mongodb.com/manual/reference/operator/aggregation/bucketAuto/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193625) Time Spent: 5h 50m (was: 5h 40m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 5h 50m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193436 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 01/Feb/19 16:11 Start Date: 01/Feb/19 16:11 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459775222 :+1: Thanks for that comment :). The important thing is to have fun, thanks to you too for the patience, this PR was bit tricky because it touched some unexpected issues (API inconsistencies to agree/fix) but hopefully we will be done soon. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193436) Time Spent: 5h 40m (was: 5.5h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 5h 40m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193430=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193430 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 01/Feb/19 16:01 Start Date: 01/Feb/19 16:01 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459771763 @iemejia I'll take a look at your commit today. I will also toy around and see how we can use splitting when it comes to aggregates. On a side note, I'm learning so much about beam by working on this PR with you, so thanks for that! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193430) Time Spent: 5.5h (was: 5h 20m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 5.5h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193390=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193390 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 01/Feb/19 14:42 Start Date: 01/Feb/19 14:42 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459744437 Thanks @sandboxws. Now we are going to do a reverse review :) Can you please take a look at this commit and give me your feedback https://github.com/iemejia/beam/commit/f2d92f26039ebe73e6e7ef26a2c258de515200f1 I changed the builders to use `AutoValue` to be more consistent with Beam practices. I renamed `QueryBuilder` to `QueryFn` but I am even hesitating to call it just `Query`. I also did some minor fixes on some basic things that static analysis tools complained about like constrained access levels. Most important subject now we need to address is partitioning (aka splitting). I am keeping the partitioning approach we had and do the filtering per each shard if it is a FindQuery. I assume that this is correct. Do you see any possible issue? The question I still have is what happens with Aggregates? Are they partitionable? Because if I take the same approach of aggregate per shard the user will have incorrect results. Do you know how it works? We are really close! My idea is once we agree on these last points to reopen a PR with both commits and get it re-reviewed and merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193390) Time Spent: 5h 10m (was: 5h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 5h 10m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193391=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193391 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 01/Feb/19 14:43 Start Date: 01/Feb/19 14:43 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459744437 Thanks @sandboxws. Now we are going to do a reverse review :) Can you please take a look at this commit and give me your feedback https://github.com/iemejia/beam/commit/f2d92f26039ebe73e6e7ef26a2c258de515200f1 I changed the builders to use `AutoValue` to be more consistent with Beam practices. I renamed `withQueryBuilder` to `withQueryFn` but I am even hesitating to call it just `withQuery`. I also did some minor fixes on some basic things that static analysis tools complained about like constrained access levels. Most important subject now we need to address is partitioning (aka splitting). I am keeping the partitioning approach we had and do the filtering per each shard if it is a FindQuery. I assume that this is correct. Do you see any possible issue? The question I still have is what happens with Aggregates? Are they partitionable? Because if I take the same approach of aggregate per shard the user will have incorrect results. Do you know how it works? We are really close! My idea is once we agree on these last points to reopen a PR with both commits and get it re-reviewed and merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193391) Time Spent: 5h 20m (was: 5h 10m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 5h 20m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193132=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193132 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 01/Feb/19 01:14 Start Date: 01/Feb/19 01:14 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459569401 @iemejia Done, branch is rebased now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193132) Time Spent: 5h (was: 4h 50m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 5h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193114=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193114 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 01/Feb/19 00:45 Start Date: 01/Feb/19 00:45 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459563415 @iemejia On it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193114) Time Spent: 4h 50m (was: 4h 40m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 4h 50m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=193100=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-193100 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 01/Feb/19 00:18 Start Date: 01/Feb/19 00:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-459557902 Hi @sandboxws I have not forgotten at all about this one. I was working to improve the MongoDbIO tests (which are now way faster and more robust). Can you please do a rebase so I can add the other changes on top of the rebased version. (Sorry for the inconvenience). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 193100) Time Spent: 4h 40m (was: 4.5h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 4h 40m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=19=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-19 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 28/Jan/19 16:35 Start Date: 28/Jan/19 16:35 Worklog Time Spent: 10m Work Description: sandboxws commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r251493455 ## File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ## @@ -426,9 +408,10 @@ private long getEstimatedSizeBytes( } LOG.debug("Number of splits is {}", splitKeys.size()); -for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { - sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter))); -} +// TODO: What should be done here? +// for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { Review comment: I’ll take a look and let you know. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 19) Time Spent: 4h 20m (was: 4h 10m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 4h 20m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=190195=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-190195 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 25/Jan/19 21:12 Start Date: 25/Jan/19 21:12 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-457730324 @iemejia Thank you so much for reviewing the PR. Looking forward to reviewing your changes. Also, I really enjoyed your comment about making a few changes to the API to make it more `Beamish` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 190195) Time Spent: 3h 50m (was: 3h 40m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 3h 50m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=190196=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-190196 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 25/Jan/19 21:13 Start Date: 25/Jan/19 21:13 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-457730668 @kennknowles if you have sometime soon, can you review the changes you requested. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 190196) Time Spent: 4h (was: 3h 50m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 4h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=190192=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-190192 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 25/Jan/19 21:10 Start Date: 25/Jan/19 21:10 Worklog Time Spent: 10m Work Description: sandboxws commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r251137132 ## File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ## @@ -426,9 +408,10 @@ private long getEstimatedSizeBytes( } LOG.debug("Number of splits is {}", splitKeys.size()); -for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { - sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter))); -} +// TODO: What should be done here? +// for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { Review comment: Now that you mentioned this. In a project I'm using internally I faced issues with splitting with a MongoDB cluster hosted by Atlas. The MongoDB version I'm using is 3.4. I will share the error/exception thrown by MongoDB shortly for more context on this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 190192) Time Spent: 3h 40m (was: 3.5h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 3h 40m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=190070=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-190070 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 25/Jan/19 16:02 Start Date: 25/Jan/19 16:02 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r251036534 ## File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ## @@ -426,9 +408,10 @@ private long getEstimatedSizeBytes( } LOG.debug("Number of splits is {}", splitKeys.size()); -for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { - sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter))); -} +// TODO: What should be done here? +// for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { Review comment: It seems `spec.filter()` is pretty ineffective here. Let me take care of this part because I want to validate the way splitting is working. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 190070) Time Spent: 3h 10m (was: 3h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 3h 10m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=190071=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-190071 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 25/Jan/19 16:02 Start Date: 25/Jan/19 16:02 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r250649855 ## File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQueryBuilder.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Projections; +import java.util.List; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.codecs.BsonValueCodecProvider; +import org.bson.codecs.IterableCodecProvider; +import org.bson.codecs.ValueCodecProvider; +import org.bson.codecs.configuration.CodecRegistries; +import org.bson.conversions.Bson; +import org.bson.types.ObjectId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Builds a MongoDB FindIterable object. */ +public class FindQueryBuilder +implements SerializableFunction, MongoCursor> { + private static final Logger LOG = LoggerFactory.getLogger(FindQueryBuilder.class); + + private List projection; + private Integer limit; + private BsonDocument filters; + private String id; + + public FindQueryBuilder withId(String id) { +this.id = id; +return this; + } + + public FindQueryBuilder withLimit(Integer limit) { +this.limit = limit; +return this; + } + + public FindQueryBuilder withProjection(List projection) { +this.projection = projection; +return this; + } + + public FindQueryBuilder withFilters(Bson filters) { Review comment: Can we just make people pass `BsonDocument` here and let that logic to the client. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 190071) Time Spent: 3h 20m (was: 3h 10m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 3h 20m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=190072=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-190072 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 25/Jan/19 16:02 Start Date: 25/Jan/19 16:02 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-457621713 Please don't merge this PR until I add some clean up commits. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 190072) Time Spent: 3.5h (was: 3h 20m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 3.5h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=189065=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189065 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 23/Jan/19 16:49 Start Date: 23/Jan/19 16:49 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-456877907 Sure sorry for the delay, This PR is next in the queue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 189065) Time Spent: 3h (was: 2h 50m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 3h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=188167=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188167 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 22/Jan/19 13:13 Start Date: 22/Jan/19 13:13 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-456394291 @iemejia do you mind taking a look at this PR sometime this week? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 188167) Time Spent: 2h 50m (was: 2h 40m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 2h 50m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=187053=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187053 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 18/Jan/19 21:44 Start Date: 18/Jan/19 21:44 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-455697617 Yes no issues, just too busy with unexpected stuff. Will take a look during the weekend. (sorry) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 187053) Time Spent: 2h 40m (was: 2.5h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 2h 40m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=186462=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186462 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 17/Jan/19 17:46 Start Date: 17/Jan/19 17:46 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-455264180 @iemejia no problem. Hope everything is alright. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186462) Time Spent: 2.5h (was: 2h 20m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 2.5h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=186436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186436 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 17/Jan/19 17:23 Start Date: 17/Jan/19 17:23 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-455256490 Arrghh unexpected event, will review tomorrow, sorry again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186436) Time Spent: 2h 20m (was: 2h 10m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 2h 20m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=185865=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185865 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 16/Jan/19 16:42 Start Date: 16/Jan/19 16:42 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-454849996 Thanks I have been busy with other stuff but will take a look at this tomorrow. Sorry for the delay. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185865) Time Spent: 2h 10m (was: 2h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 2h 10m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=184841=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184841 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 14/Jan/19 15:31 Start Date: 14/Jan/19 15:31 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-454045945 @iemejia Done. Branch is rebased now. I've one thing left with this PR now which is this [part](https://github.com/apache/beam/pull/7293/files#diff-e97e16ad18d6a411d9035da70219acf5R408). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 184841) Time Spent: 2h (was: 1h 50m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 2h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=184142=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184142 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 11/Jan/19 09:54 Start Date: 11/Jan/19 09:54 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-453458873 Excellent! It is just simple is like a Java Function `OutputT apply(InputT input);` where we guarantee that it fully serializes. I think you will understand it immediately and I hope that all the `QueryBuilder` types are fully serializable so it will be easy :crossed_fingers:. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 184142) Time Spent: 1h 40m (was: 1.5h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 1h 40m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=183875=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183875 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 10/Jan/19 19:20 Start Date: 10/Jan/19 19:20 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-453219896 @iemejia I see your point more clearly now, I'm sold. Will hack on this during the weekend. In the meantime, any good source on using `SerializableFunction` or should I just poke around the source code? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 183875) Time Spent: 1.5h (was: 1h 20m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 1.5h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=183759=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183759 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 10/Jan/19 15:52 Start Date: 10/Jan/19 15:52 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-453145486 Hehe that Zelda game is pure dope. I understand your point this looks like a tradeoff of usability (`QueryBuilder`) vs extensability (`SerializableFunction`). This does not change a lot save for the fact that if in the future there is a new filter / predicate / special condition the user wants to hack not covered by QueryBuilder, the user can override the function version without waiting for a change upstream. What I would like to avoid is having more `withLike` methods in Read to configure a "query". I think it is way better to have a common function/object that wraps all that configuration as you did with `QueryBuilder` :+1: and not to have more `withXXX` (motivated by maintainability). I see your point on the Pipeline/Aggregations API but if this is covered by `withQuery` and parameterize via `QueryBuilder` we avoid a duplicate method, so let’s make this if you agree, we refactor this to have `withQuery` as I proposed and in the docs we mention that we provide a de-facto `QueryBuilder` implementation to ease its use. We should use that object in the tests too as an example. I think we can even arrive to the point where we can even deprecate the other query related `withXXX` configuration methods and unify the query API on withQuery(SerializableFunction) (with the public `QueryBuilder` as a user friendly version of that function. WDYT? Am I missing probably some particular use case or issue with this idea? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 183759) Time Spent: 1h 10m (was: 1h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 1h 10m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=183760=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183760 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 10/Jan/19 15:55 Start Date: 10/Jan/19 15:55 Worklog Time Spent: 10m Work Description: iemejia commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-453145486 Hehe that Zelda game is pure dope. I understand your point this looks like a tradeoff of usability (`QueryBuilder`) vs extensability (`SerializableFunction`). This does not change a lot save for the fact that if in the future there is a new filter / predicate / special condition the user wants to hack not covered by QueryBuilder, the user can override the function version without waiting for a change upstream. What I would like to avoid is having more `withLike` methods in Read to configure a "query". I think it is way better to have a common function/object that wraps all that configuration as you did with `QueryBuilder` :+1: and not to have more `withXXX` (motivated by maintainability). I see your point on the Pipeline/Aggregations API but if this is covered by `withQuery` and parameterize via `QueryBuilder` we avoid a duplicate method, so let’s make this if you agree, we refactor this to have `withQuery` as I proposed and in the docs we mention that we provide a de-facto `QueryBuilder` implementation to ease its use. We should use that object in the tests too as an example. I think we can even arrive to the point where we can even deprecate the other query related configuration methods e.g. `withProjection`, `withFilter` and unify the query API on `withQuery(SerializableFunction)` with the public `QueryBuilder` as a user friendly version of that function. WDYT? Am I missing probably some particular use case or issue with this idea? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 183760) Time Spent: 1h 20m (was: 1h 10m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Time Spent: 1h 20m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=182827=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182827 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 09/Jan/19 03:23 Start Date: 09/Jan/19 03:23 Worklog Time Spent: 10m Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#issuecomment-452558297 @iemejia No need to apologize, I hope you had a great break. I spent almost my entire break playing RDR2 (Red Dead Redemption 2) and BOTW ( Legend of Zelda: Breath of the Wild) I like your suggestion of using a `SerializableFunction`, although if I understood your suggestion correctly, I don't think from a user perspective anything will change, aside from interacting more with the QueryBuilder. Based on the snippet you added above, we will end up with something similar to the following: ```java MongoDbIO.read() // server, etc .withQuery( QueryBuilder.create() .withLimit(10) .withProjection("foo", "bar") .build() ); ``` I honestly think that `MongoDbIO.Read` can be further enhanced to be more developer friendly, but the challenge will remain the same. The challenge I'm referring to here is that MongoDb allows two different, and distinct methods of quering a database. There is the commonly used [find](https://docs.mongodb.com/manual/reference/method/db.collection.find/), and there is the more advanced [aggregation](https://docs.mongodb.com/manual/aggregation/). For most developers, especially the ones using streaming pipelines, the following will suffice: ```java MongoDbIO.read() .withUri(mongodbUri) .withDatabase(database) .withCollection(collection) .withDocumentIdStr("52cc8f6254c432784307") ``` However, when using batch processing for backfilling historical data to say a data warehouse, MongoDB aggregation/pipelines is crucial. Let me know what you think, in the meantime I'll address the remaining comments mentioned by you and @kennknowles. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 182827) Time Spent: 1h (was: 50m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Fix For: 2.10.0 > > Time Spent: 1h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=182822=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182822 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 09/Jan/19 03:08 Start Date: 09/Jan/19 03:08 Worklog Time Spent: 10m Work Description: sandboxws commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r246247644 ## File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/QueryBuilder.java ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.codecs.BsonValueCodecProvider; +import org.bson.codecs.IterableCodecProvider; +import org.bson.codecs.ValueCodecProvider; +import org.bson.codecs.configuration.CodecRegistries; +import org.bson.conversions.Bson; +import org.bson.types.ObjectId; + +/** + * Builds an AggregateIterable pipeline using multiple options. + * + * @author Ahmed Elhossaini + */ +public class QueryBuilder { + private Integer limit; + private List projection; + private List pipeline; + MongoCollection collection; + private String filter; + private String documentIdStr; + private ObjectId documentId; + + public static QueryBuilder create(MongoCollection collection) { +QueryBuilder builder = new QueryBuilder(); +builder.collection = collection; + Review comment: Will trim. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 182822) Time Spent: 40m (was: 0.5h) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Fix For: 2.10.0 > > Time Spent: 40m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=182823=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182823 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 09/Jan/19 03:09 Start Date: 09/Jan/19 03:09 Worklog Time Spent: 10m Work Description: sandboxws commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r246247746 ## File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ## @@ -154,6 +157,17 @@ private MongoDbIO() {} abstract int numSplits(); +abstract int limit(); + +@Nullable +abstract List pipeline(); + +@Nullable +abstract String documentIdStr(); Review comment: Different how? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 182823) Time Spent: 50m (was: 40m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Fix For: 2.10.0 > > Time Spent: 50m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=182819=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182819 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 09/Jan/19 03:08 Start Date: 09/Jan/19 03:08 Worklog Time Spent: 10m Work Description: sandboxws commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r246247599 ## File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ## @@ -180,6 +194,14 @@ private MongoDbIO() {} abstract Builder setNumSplits(int numSplits); + abstract Builder setLimit(int limit); + + abstract Builder setPipeline(List pipeline); + + abstract Builder setDocumentIdStr(String documentId); + + abstract Builder setDocumentId(ObjectId documentId); Review comment: Agreed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 182819) Time Spent: 10m Remaining Estimate: 0h > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Fix For: 2.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=182821=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182821 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 09/Jan/19 03:08 Start Date: 09/Jan/19 03:08 Worklog Time Spent: 10m Work Description: sandboxws commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r246247639 ## File path: sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java ## @@ -266,6 +272,46 @@ public void testReadWithFilter() throws Exception { pipeline.run(); } + @Test + public void testReadWithFilterAndLimit() throws Exception { + Review comment: Will trim. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 182821) Time Spent: 0.5h (was: 20m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Fix For: 2.10.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support
[ https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=182820=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182820 ] ASF GitHub Bot logged work on BEAM-6241: Author: ASF GitHub Bot Created on: 09/Jan/19 03:08 Start Date: 09/Jan/19 03:08 Worklog Time Spent: 10m Work Description: sandboxws commented on pull request #7293: [BEAM-6241] Added limit and aggregates support to MongoDbIO URL: https://github.com/apache/beam/pull/7293#discussion_r246247611 ## File path: sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/QueryBuilder.java ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.codecs.BsonValueCodecProvider; +import org.bson.codecs.IterableCodecProvider; +import org.bson.codecs.ValueCodecProvider; +import org.bson.codecs.configuration.CodecRegistries; +import org.bson.conversions.Bson; +import org.bson.types.ObjectId; + +/** + * Builds an AggregateIterable pipeline using multiple options. + * + * @author Ahmed Elhossaini Review comment: Will do. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 182820) Time Spent: 20m (was: 10m) > MongoDbIO - Add Limit and Aggregates Support > > > Key: BEAM-6241 > URL: https://issues.apache.org/jira/browse/BEAM-6241 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.9.0 >Reporter: Ahmed El.Hussaini >Assignee: Ahmed El.Hussaini >Priority: Major > Labels: easyfix > Fix For: 2.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > h2. Adds Support to Limit Results > > {code:java} > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withFilter("{\"scientist\":\"Einstein\"}") > .withLimit(5));{code} > h2. Adds Support to Use Aggregates > > {code:java} > List aggregates = new ArrayList(); > aggregates.add( > new BsonDocument( > "$match", > new BsonDocument("country", new BsonDocument("$eq", new > BsonString("England"); > PCollection output = > pipeline.apply( > MongoDbIO.read() > .withUri("mongodb://localhost:" + port) > .withDatabase(DATABASE) > .withCollection(COLLECTION) > .withAggregate(aggregates)); > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)