slinkydeveloper commented on a change in pull request #17652:
URL: https://github.com/apache/flink/pull/17652#discussion_r741682841
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSourceTest.scala
##########
@@ -55,17 +55,39 @@ class PartitionableSourceTest(
|)
|""".stripMargin
+ // test when PushDownFilter can consume all filters including fields
partitionKeys
+ val partitionableAndFilterableTable =
+ """
+ |CREATE TABLE PartitionableAndFilterableTable (
+ | id int,
+ | name string,
+ | part1 string,
+ | part2 int,
+ | virtualField as part2 + 1)
Review comment:
Perhaps `computedField` to reuse the same terms used in the interfaces?
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSourceTest.scala
##########
@@ -76,30 +98,41 @@ class PartitionableSourceTest(
val catalogPartitionSpec = new CatalogPartitionSpec(partition)
val catalogPartition = new CatalogPartitionImpl(
new java.util.HashMap[String, String](), "")
- catalog.createPartition(mytablePath, catalogPartitionSpec,
catalogPartition, true)
+ catalog.createPartition(
+ partitionableTablePath, catalogPartitionSpec, catalogPartition, true)
+ catalog.createPartition(
+ partitionableAndFilterableTablePath, catalogPartitionSpec,
catalogPartition, true)
})
}
}
@Test
def testSimplePartitionFieldPredicate1(): Unit = {
- util.verifyExecPlan("SELECT * FROM MyTable WHERE part1 = 'A'")
+ util.verifyExecPlan("SELECT * FROM PartitionableTable WHERE part1 = 'A'")
}
@Test
def testPartialPartitionFieldPredicatePushDown(): Unit = {
- util.verifyExecPlan("SELECT * FROM MyTable WHERE (id > 2 OR part1 = 'A')
AND part2 > 1")
+ util.verifyExecPlan(
+ "SELECT * FROM PartitionableTable WHERE (id > 2 OR part1 = 'A') AND
part2 > 1")
}
@Test
def testWithUdfAndVirtualColumn(): Unit = {
util.addFunction("MyUdf", Func1)
- util.verifyExecPlan("SELECT * FROM MyTable WHERE id > 2 AND MyUdf(part2) <
3")
+ util.verifyExecPlan("SELECT * FROM PartitionableTable WHERE id > 2 AND
MyUdf(part2) < 3")
}
@Test
def testUnconvertedExpression(): Unit = {
- util.verifyExecPlan("select * from MyTable where trim(part1) = 'A' and
part2 > 1")
+ util.verifyExecPlan("select * from PartitionableTable where trim(part1) =
'A' and part2 > 1")
+ }
+
+ @Test
+ def testPushDownPartitionAndFiltersContainPartitionKeys(): Unit = {
+ util.verifyExecPlan(
+ "select * from PartitionableAndFilterableTable " +
Review comment:
Can you add a test case here with the very same query, except you
project only `name`? You should see only `name` and `id` in the projected
fields of the source. This is the particular case we were hitting.
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/PartitionableSourceTest.scala
##########
@@ -76,30 +98,41 @@ class PartitionableSourceTest(
val catalogPartitionSpec = new CatalogPartitionSpec(partition)
val catalogPartition = new CatalogPartitionImpl(
new java.util.HashMap[String, String](), "")
- catalog.createPartition(mytablePath, catalogPartitionSpec,
catalogPartition, true)
+ catalog.createPartition(
+ partitionableTablePath, catalogPartitionSpec, catalogPartition, true)
+ catalog.createPartition(
+ partitionableAndFilterableTablePath, catalogPartitionSpec,
catalogPartition, true)
})
}
}
@Test
def testSimplePartitionFieldPredicate1(): Unit = {
- util.verifyExecPlan("SELECT * FROM MyTable WHERE part1 = 'A'")
+ util.verifyExecPlan("SELECT * FROM PartitionableTable WHERE part1 = 'A'")
}
@Test
def testPartialPartitionFieldPredicatePushDown(): Unit = {
- util.verifyExecPlan("SELECT * FROM MyTable WHERE (id > 2 OR part1 = 'A')
AND part2 > 1")
+ util.verifyExecPlan(
+ "SELECT * FROM PartitionableTable WHERE (id > 2 OR part1 = 'A') AND
part2 > 1")
}
@Test
def testWithUdfAndVirtualColumn(): Unit = {
util.addFunction("MyUdf", Func1)
- util.verifyExecPlan("SELECT * FROM MyTable WHERE id > 2 AND MyUdf(part2) <
3")
+ util.verifyExecPlan("SELECT * FROM PartitionableTable WHERE id > 2 AND
MyUdf(part2) < 3")
}
@Test
def testUnconvertedExpression(): Unit = {
- util.verifyExecPlan("select * from MyTable where trim(part1) = 'A' and
part2 > 1")
+ util.verifyExecPlan("select * from PartitionableTable where trim(part1) =
'A' and part2 > 1")
+ }
+
+ @Test
+ def testPushDownPartitionAndFiltersContainPartitionKeys(): Unit = {
+ util.verifyExecPlan(
+ "select * from PartitionableAndFilterableTable " +
Review comment:
`select name from PartitionableAndFilterableTable where part1 = 'A' and
part2 > 1 and id > 1`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]