[ https://issues.apache.org/jira/browse/BEAM-8664?focusedWorklogId=352226&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352226 ]
ASF GitHub Bot logged work on BEAM-8664: ---------------------------------------- Author: ASF GitHub Bot Created on: 02/Dec/19 21:24 Start Date: 02/Dec/19 21:24 Worklog Time Spent: 10m Work Description: 11moon11 commented on pull request #10095: [BEAM-8664] [SQL] MongoDb project push down URL: https://github.com/apache/beam/pull/10095#discussion_r351538013 ########## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java ########## @@ -187,7 +199,68 @@ public void testWriteAndRead() { PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery("select * from TEST")); - assertEquals(output.getSchema(), SOURCE_SCHEMA); + assertEquals(SOURCE_SCHEMA, output.getSchema()); + + PAssert.that(output).containsInAnyOrder(testRow); + + readPipeline.run().waitUntilFinish(); + } + + @Test + public void testProjectPushDown() { + final Schema expectedSchema = + Schema.builder() + .addNullableField("c_varchar", STRING) + .addNullableField("c_boolean", BOOLEAN) + .addNullableField("c_integer", INT32) + .build(); + Row testRow = row(expectedSchema, "varchar", true, 2147483647); + + String createTableStatement = + "CREATE EXTERNAL TABLE TEST( \n" + + " c_bigint BIGINT, \n " + + " c_tinyint TINYINT, \n" + + " c_smallint SMALLINT, \n" + + " c_integer INTEGER, \n" + + " c_float FLOAT, \n" + + " c_double DOUBLE, \n" + + " c_boolean BOOLEAN, \n" + + " c_varchar VARCHAR, \n " + + " c_arr ARRAY<VARCHAR> \n" + + ") \n" + + "TYPE 'mongodb' \n" + + "LOCATION '" + + mongoSqlUrl + + "'"; + sqlEnv.executeDdl(createTableStatement); + + String insertStatement = + "INSERT INTO TEST VALUES (" + + "9223372036854775807, " + + "127, " + + "32767, " + + "2147483647, " + + "1.0, " + + "1.0, " + + "TRUE, " + + "'varchar', " + + "ARRAY['123', '456']" + + ")"; + + BeamRelNode insertRelNode = sqlEnv.parseQuery(insertStatement); + BeamSqlRelUtils.toPCollection(writePipeline, insertRelNode); + writePipeline.run().waitUntilFinish(); + + BeamRelNode node = sqlEnv.parseQuery("select c_varchar, c_boolean, c_integer from TEST"); + // Calc should be dropped, since MongoDb supports project push-down and field reordering. + assertThat(node, instanceOf(BeamPushDownIOSourceRel.class)); Review comment: Fields retrieved from MongoDB are returned as a `PCollection<Document>` from `MongoDBIO.expand()`, which is called by `MongoDbTable.buildIOReader(...)`. An `IOSourceRel` does not do any data manipulations (just wraps an IO and does type conversion). I am not exactly sure that checking values returned by `MongoDBIO.expand()` would be much different, unless I am misunderstanding something. I also [looked](https://stackoverflow.com/questions/41550450/mongodb-list-previous-queries) into retrieving some sort of log (from the embedded database) to check what query actually got executed, but I am not sure that we can easily retrieve that data. Alternatively, we could validate that `withQueryFn` on `MongoDbIO.Read` does indeed have projected fields set. But that would require somehow exposing `MongoDbIO.Read` to the tests. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 352226) Time Spent: 4h (was: 3h 50m) > [SQL] MongoDb should use project push-down > ------------------------------------------ > > Key: BEAM-8664 > URL: https://issues.apache.org/jira/browse/BEAM-8664 > Project: Beam > Issue Type: Improvement > Components: dsl-sql > Reporter: Kirill Kozlov > Assignee: Kirill Kozlov > Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > MongoDbTable should implement the following methods: > {code:java} > public PCollection<Row> buildIOReader( > PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames); > public ProjectSupport supportsProjects(); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)