[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929815#comment-15929815 ] ASF GitHub Bot commented on FLINK-3849: --- Github user tonycox closed the pull request at: https://github.com/apache/flink/pull/3166 > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > Fix For: 1.3.0 > > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929690#comment-15929690 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3166 Hi @tonycox, the follow up PR #3520 to this one was merged including the changes of this PR. Can you close this PR? Thanks, Fabian > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > Fix For: 1.3.0 > > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929684#comment-15929684 ] ASF GitHub Bot commented on FLINK-3849: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3520 > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > Fix For: 1.3.0 > > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929614#comment-15929614 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106609749 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase { filterableSource: FilterableTableSource[_], description: String): Unit = { -if (filterableSource.isFilterPushedDown) { - // The rule can get triggered again due to the transformed "scan => filter" - // sequence created by the earlier execution of this rule when we could not - // push all the conditions into the scan - return -} +Preconditions.checkArgument(!filterableSource.isFilterPushedDown) val program = calc.getProgram +val functionCatalog = FunctionCatalog.withBuiltIns val (predicates, unconvertedRexNodes) = RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -tableSourceTable.tableEnv.getFunctionCatalog) +functionCatalog) if (predicates.isEmpty) { // no condition can be translated to expression return } -val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) -// trying to apply filter push down, set the flag to true no matter whether -// we actually push any filters down. -newTableSource.setFilterPushedDown(true) +val remainingPredicates = new util.LinkedList[Expression]() +predicates.foreach(e => remainingPredicates.add(e)) + +val newTableSource = filterableSource.applyPredicate(remainingPredicates) --- End diff -- OK, you're right. Thanks for the example. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929596#comment-15929596 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106606071 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase { filterableSource: FilterableTableSource[_], description: String): Unit = { -if (filterableSource.isFilterPushedDown) { - // The rule can get triggered again due to the transformed "scan => filter" - // sequence created by the earlier execution of this rule when we could not - // push all the conditions into the scan - return -} +Preconditions.checkArgument(!filterableSource.isFilterPushedDown) val program = calc.getProgram +val functionCatalog = FunctionCatalog.withBuiltIns val (predicates, unconvertedRexNodes) = RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -tableSourceTable.tableEnv.getFunctionCatalog) +functionCatalog) if (predicates.isEmpty) { // no condition can be translated to expression return } -val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) -// trying to apply filter push down, set the flag to true no matter whether -// we actually push any filters down. -newTableSource.setFilterPushedDown(true) +val remainingPredicates = new util.LinkedList[Expression]() +predicates.foreach(e => remainingPredicates.add(e)) + +val newTableSource = filterableSource.applyPredicate(remainingPredicates) --- End diff -- For example the condition looks like "a OR b", if the table source knows that `a` will always be `false`, but not sure with `b`. So it can actually modify the condition to `b` to reduce framework cost > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929592#comment-15929592 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3520 yes > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929589#comment-15929589 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Sure, so the result is there will be 2 commits to master repository? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929584#comment-15929584 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3520 Can you squash the commits of the PR into two commits before merging? One for @tonycox and one for your changes? Thanks! > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929583#comment-15929583 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106603629 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase { filterableSource: FilterableTableSource[_], description: String): Unit = { -if (filterableSource.isFilterPushedDown) { - // The rule can get triggered again due to the transformed "scan => filter" - // sequence created by the earlier execution of this rule when we could not - // push all the conditions into the scan - return -} +Preconditions.checkArgument(!filterableSource.isFilterPushedDown) val program = calc.getProgram +val functionCatalog = FunctionCatalog.withBuiltIns val (predicates, unconvertedRexNodes) = RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -tableSourceTable.tableEnv.getFunctionCatalog) +functionCatalog) if (predicates.isEmpty) { // no condition can be translated to expression return } -val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) -// trying to apply filter push down, set the flag to true no matter whether -// we actually push any filters down. -newTableSource.setFilterPushedDown(true) +val remainingPredicates = new util.LinkedList[Expression]() +predicates.foreach(e => remainingPredicates.add(e)) + +val newTableSource = filterableSource.applyPredicate(remainingPredicates) --- End diff -- That was actually one of the reasons for converting the predicate to CNF. In CNF you can either evaluate a complete term or skip it and let a Calc evaluate it later. I don't see a case where it would make sense to modify a conjunctive term but not evaluating it. Of course, the table source could reorganize the expressions but what would be the purpose of that? IMO, this could be error prone and introduce bugs. Do you have a concrete use case in mind, when a `FilterableTableSource` would modify the expressions that it does not evaluate? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929314#comment-15929314 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Hi @fhueske, thanks for the review. I addressed all your comments and will rebase to master to let travis check. Will merge this after build success. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929298#comment-15929298 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106568641 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala --- @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table + +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.sources.{CsvTableSource, TableSource} +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.expressions.utils._ +import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource} +import org.junit.{Assert, Test} + +class TableSourceTest extends TableTestBase { + + private val projectedFields: Array[String] = Array("last", "id", "score") + private val noCalcFields: Array[String] = Array("id", "score", "first") + + // batch plan + + @Test + def testBatchProjectableSourceScanPlanTableApi(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('last.upperCase(), 'id.floor(), 'score * 2) + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, projectedFields), + term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2") +) + +util.verifyTable(result, expected) + } + + @Test + def testBatchProjectableSourceScanPlanSQL(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() + +util.tEnv.registerTableSource(tableName, tableSource) + +val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName" + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, projectedFields), + term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2") +) + +util.verifySql(sqlQuery, expected) + } + + @Test + def testBatchProjectableSourceScanNoIdentityCalc(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('id, 'score, 'first) + +val expected = batchSourceTableNode(tableName, noCalcFields) +util.verifyTable(result, expected) + } + + @Test + def testBatchFilterableWithoutPushDown(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv +.scan(tableName) +.select('price, 'id, 'amount) +.where("price * 2 < 32") + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode( +tableName, +Array("name", "id", "amount", "price")), + term("select", "price", "id", "amount"), + term("where", "<(*(price, 2), 32)") +) + +util.verifyTable(result, expected) + } + + @Test + def testBatchFilterablePartialPushDown(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .where("amount > 2
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929280#comment-15929280 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566787 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala --- @@ -16,106 +16,96 @@ * limitations under the License. */ -package org.apache.flink.table.plan.rules.util +package org.apache.flink.table.plan.util import java.math.BigDecimal +import java.util import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder} -import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR} +import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR, BOOLEAN} import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ -import org.junit.Assert.{assertArrayEquals, assertTrue} -import org.junit.{Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable -/** - * This class is responsible for testing RexProgramProjectExtractor. - */ -class RexProgramProjectExtractorTest { - private var typeFactory: JavaTypeFactory = _ - private var rexBuilder: RexBuilder = _ - private var allFieldTypes: Seq[RelDataType] = _ - private val allFieldNames = List("name", "id", "amount", "price") - - @Before - def setUp(): Unit = { -typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) -rexBuilder = new RexBuilder(typeFactory) -allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_)) - } +abstract class RexProgramTestBase { - @Test - def testExtractRefInputFields(): Unit = { -val usedFields = extractRefInputFields(buildRexProgram()) -assertArrayEquals(usedFields, Array(2, 3, 1)) - } + val typeFactory: JavaTypeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) + + val allFieldNames: util.List[String] = List("name", "id", "amount", "price", "flag").asJava + + val allFieldTypes: util.List[RelDataType] = +List(VARCHAR, BIGINT, INTEGER, DOUBLE, BOOLEAN).map(typeFactory.createSqlType).asJava + + var rexBuilder: RexBuilder = new RexBuilder(typeFactory) - @Test - def testRewriteRexProgram(): Unit = { -val originRexProgram = buildRexProgram() -assertTrue(extractExprStrList(originRexProgram).sameElements(Array( - "$0", - "$1", - "$2", - "$3", - "*($t2, $t3)", - "100", - "<($t4, $t5)", - "6", - ">($t1, $t7)", - "AND($t6, $t8)"))) -// use amount, id, price fields to create a new RexProgram -val usedFields = Array(2, 3, 1) -val types = usedFields.map(allFieldTypes(_)).toList.asJava -val names = usedFields.map(allFieldNames(_)).toList.asJava -val inputRowType = typeFactory.createStructType(types, names) -val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder) -assertTrue(extractExprStrList(newRexProgram).sameElements(Array( - "$0", - "$1", - "$2", - "*($t0, $t1)", - "100", - "<($t3, $t4)", - "6", - ">($t2, $t6)", - "AND($t5, $t7)"))) + /** +* extract all expression string list from input RexProgram expression lists +* +* @param rexProgram input RexProgram instance to analyze +* @return all expression string list of input RexProgram expression lists +*/ + protected def extractExprStrList(rexProgram: RexProgram): mutable.Buffer[String] = { +rexProgram.getExprList.asScala.map(_.toString) } - private def buildRexProgram(): RexProgram = { -val types = allFieldTypes.asJava -val names = allFieldNames.asJava -val inputRowType = typeFactory.createStructType(types, names) + // select amount, amount * price as total where amount * price < 100 and id > 6 + protected def buildSimpleRexProgram1(): RexProgram = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) val builder = new RexProgramBuilder(inputRowType, rexBuilder) -val t0 = rexBuilder.makeInputRef(types.get(2), 2) -val t1 = rexBuilder.makeInputRef(types.get(1), 1)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929276#comment-15929276 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566599 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import java.math.BigDecimal + +import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder} +import org.apache.calcite.sql.SqlPostfixOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.validate.FunctionCatalog +import org.junit.Assert.{assertArrayEquals, assertEquals} +import org.junit.Test + +import scala.collection.JavaConverters._ + +class RexProgramExtractorTest extends RexProgramTestBase { + + private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns + + @Test + def testExtractRefInputFields(): Unit = { +val usedFields = RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram1()) +assertArrayEquals(usedFields, Array(2, 3, 1)) + } + + @Test + def testExtractSimpleCondition(): Unit = { +val builder: RexBuilder = new RexBuilder(typeFactory) +val program = buildSimpleRexProgram2() + +val firstExp = ExpressionParser.parseExpression("id > 6") +val secondExp = ExpressionParser.parseExpression("amount * price < 100") +val expected: Array[Expression] = Array(firstExp, secondExp) + +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +builder, +functionCatalog) + +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + @Test + def testExtractSingleCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) + +// a = amount >= id +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, t0, t1)) +builder.addCondition(a) + +val program = builder.getProgram +val relBuilder: RexBuilder = new RexBuilder(typeFactory) +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +relBuilder, +functionCatalog) + +val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id")) +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d) + @Test + def testExtractCnfCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) +// price +val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3) +// 100 +val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) + +// a = amount < 100 +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0,
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929277#comment-15929277 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566606 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import java.math.BigDecimal + +import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder} +import org.apache.calcite.sql.SqlPostfixOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.validate.FunctionCatalog +import org.junit.Assert.{assertArrayEquals, assertEquals} +import org.junit.Test + +import scala.collection.JavaConverters._ + +class RexProgramExtractorTest extends RexProgramTestBase { + + private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns + + @Test + def testExtractRefInputFields(): Unit = { +val usedFields = RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram1()) +assertArrayEquals(usedFields, Array(2, 3, 1)) + } + + @Test + def testExtractSimpleCondition(): Unit = { +val builder: RexBuilder = new RexBuilder(typeFactory) +val program = buildSimpleRexProgram2() + +val firstExp = ExpressionParser.parseExpression("id > 6") +val secondExp = ExpressionParser.parseExpression("amount * price < 100") +val expected: Array[Expression] = Array(firstExp, secondExp) + +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +builder, +functionCatalog) + +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + @Test + def testExtractSingleCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) + +// a = amount >= id +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, t0, t1)) +builder.addCondition(a) + +val program = builder.getProgram +val relBuilder: RexBuilder = new RexBuilder(typeFactory) +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +relBuilder, +functionCatalog) + +val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id")) +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d) + @Test + def testExtractCnfCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) +// price +val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3) +// 100 +val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) + +// a = amount < 100 +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0,
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929267#comment-15929267 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106566042 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.Types._ +import org.apache.flink.table.expressions._ +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource, TableSource} +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.tools.nsc.interpreter.JList + +/** + * This source can only handle simple comparision with field "amount". + * Supports ">, <, >=, <=, =, <>" with an integer. + */ +class TestFilterableTableSource( +val recordNum: Int = 33) +extends BatchTableSource[Row] +with StreamTableSource[Row] +with FilterableTableSource[Row] { + + var filterPushedDown: Boolean = false + + val fieldNames: Array[String] = Array("name", "id", "amount", "price") + + val fieldTypes: Array[TypeInformation[_]] = Array(STRING, LONG, INT, DOUBLE) + + // all predicates with filed "amount" + private var filterPredicates = new mutable.ArrayBuffer[Expression] + + // all comparing values for field "amount" + private val filterValues = new mutable.ArrayBuffer[Int] + + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { +execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) + } + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { +execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) + } + + override def explainSource(): String = { +if (filterPredicates.nonEmpty) { + s"filter=[${filterPredicates.reduce((l, r) => And(l, r)).toString}]" +} else { + "" +} + } + + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) + + override def applyPredicate(predicates: JList[Expression]): TableSource[Row] = { +val newSource = new TestFilterableTableSource(recordNum) +newSource.filterPushedDown = true + +val iterator = predicates.iterator() +while (iterator.hasNext) { + iterator.next() match { +case expr: BinaryComparison => + (expr.left, expr.right) match { +case (f: ResolvedFieldReference, v: Literal) if f.name.equals("amount") => + newSource.filterPredicates += expr + newSource.filterValues += v.value.asInstanceOf[Number].intValue() + iterator.remove() +case (_, _) => + } + } +} + +newSource + } + + override def isFilterPushedDown: Boolean = filterPushedDown + + private def generateDynamicCollection(): Seq[Row] = { +Preconditions.checkArgument(filterPredicates.length == filterValues.length) + +for { + cnt <- 0 until recordNum + if shouldCreateRow(cnt) +} yield { + val row = new Row(fieldNames.length) --- End diff --
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929261#comment-15929261 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106565668 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase { filterableSource: FilterableTableSource[_], description: String): Unit = { -if (filterableSource.isFilterPushedDown) { - // The rule can get triggered again due to the transformed "scan => filter" - // sequence created by the earlier execution of this rule when we could not - // push all the conditions into the scan - return -} +Preconditions.checkArgument(!filterableSource.isFilterPushedDown) val program = calc.getProgram +val functionCatalog = FunctionCatalog.withBuiltIns val (predicates, unconvertedRexNodes) = RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -tableSourceTable.tableEnv.getFunctionCatalog) +functionCatalog) if (predicates.isEmpty) { // no condition can be translated to expression return } -val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) -// trying to apply filter push down, set the flag to true no matter whether -// we actually push any filters down. -newTableSource.setFilterPushedDown(true) +val remainingPredicates = new util.LinkedList[Expression]() +predicates.foreach(e => remainingPredicates.add(e)) + +val newTableSource = filterableSource.applyPredicate(remainingPredicates) --- End diff -- I think we don't have to restrict this. If the user for some reason indeed want to change the predicates which returns back and executed by framework, we should allow them to do so. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929262#comment-15929262 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106565688 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -20,47 +20,40 @@ package org.apache.flink.table.sources import org.apache.flink.table.expressions.Expression +import scala.tools.nsc.interpreter.JList --- End diff -- sure > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928400#comment-15928400 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106447875 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala --- @@ -18,20 +18,20 @@ package org.apache.flink.table.utils -import org.apache.calcite.rel.RelWriter import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.Types._ import org.apache.flink.table.expressions._ -import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource} +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource, TableSource} import org.apache.flink.types.Row import org.apache.flink.util.Preconditions import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.tools.nsc.interpreter.JList --- End diff -- change to `import java.util.{List => JList}`? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928393#comment-15928393 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106463685 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import java.math.BigDecimal + +import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder} +import org.apache.calcite.sql.SqlPostfixOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.validate.FunctionCatalog +import org.junit.Assert.{assertArrayEquals, assertEquals} +import org.junit.Test + +import scala.collection.JavaConverters._ + +class RexProgramExtractorTest extends RexProgramTestBase { + + private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns + + @Test + def testExtractRefInputFields(): Unit = { +val usedFields = RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram1()) +assertArrayEquals(usedFields, Array(2, 3, 1)) + } + + @Test + def testExtractSimpleCondition(): Unit = { +val builder: RexBuilder = new RexBuilder(typeFactory) +val program = buildSimpleRexProgram2() + +val firstExp = ExpressionParser.parseExpression("id > 6") +val secondExp = ExpressionParser.parseExpression("amount * price < 100") +val expected: Array[Expression] = Array(firstExp, secondExp) + +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +builder, +functionCatalog) + +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + @Test + def testExtractSingleCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) + +// a = amount >= id +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, t0, t1)) +builder.addCondition(a) + +val program = builder.getProgram +val relBuilder: RexBuilder = new RexBuilder(typeFactory) +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +relBuilder, +functionCatalog) + +val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id")) +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d) + @Test + def testExtractCnfCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) +// price +val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3) +// 100 +val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) + +// a = amount < 100 +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0,
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928402#comment-15928402 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106464689 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import java.math.BigDecimal + +import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder} +import org.apache.calcite.sql.SqlPostfixOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.validate.FunctionCatalog +import org.junit.Assert.{assertArrayEquals, assertEquals} +import org.junit.Test + +import scala.collection.JavaConverters._ + +class RexProgramExtractorTest extends RexProgramTestBase { + + private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns + + @Test + def testExtractRefInputFields(): Unit = { +val usedFields = RexProgramExtractor.extractRefInputFields(buildSimpleRexProgram1()) +assertArrayEquals(usedFields, Array(2, 3, 1)) + } + + @Test + def testExtractSimpleCondition(): Unit = { +val builder: RexBuilder = new RexBuilder(typeFactory) +val program = buildSimpleRexProgram2() + +val firstExp = ExpressionParser.parseExpression("id > 6") +val secondExp = ExpressionParser.parseExpression("amount * price < 100") +val expected: Array[Expression] = Array(firstExp, secondExp) + +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +builder, +functionCatalog) + +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + @Test + def testExtractSingleCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) + +// a = amount >= id +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, t0, t1)) +builder.addCondition(a) + +val program = builder.getProgram +val relBuilder: RexBuilder = new RexBuilder(typeFactory) +val (convertedExpressions, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +relBuilder, +functionCatalog) + +val expected: Array[Expression] = Array(ExpressionParser.parseExpression("amount >= id")) +assertExpressionArrayEquals(expected, convertedExpressions) +assertEquals(0, unconvertedRexNodes.length) + } + + // ((a AND b) OR c) AND (NOT d) => (a OR c) AND (b OR c) AND (NOT d) + @Test + def testExtractCnfCondition(): Unit = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) +val builder = new RexProgramBuilder(inputRowType, rexBuilder) + +// amount +val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2) +// id +val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1) +// price +val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3) +// 100 +val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L)) + +// a = amount < 100 +val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0,
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928394#comment-15928394 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106468487 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.Types._ +import org.apache.flink.table.expressions._ +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource, TableSource} +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.tools.nsc.interpreter.JList + +/** + * This source can only handle simple comparision with field "amount". + * Supports ">, <, >=, <=, =, <>" with an integer. + */ +class TestFilterableTableSource( +val recordNum: Int = 33) +extends BatchTableSource[Row] +with StreamTableSource[Row] +with FilterableTableSource[Row] { + + var filterPushedDown: Boolean = false + + val fieldNames: Array[String] = Array("name", "id", "amount", "price") + + val fieldTypes: Array[TypeInformation[_]] = Array(STRING, LONG, INT, DOUBLE) + + // all predicates with filed "amount" + private var filterPredicates = new mutable.ArrayBuffer[Expression] + + // all comparing values for field "amount" + private val filterValues = new mutable.ArrayBuffer[Int] + + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { +execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) + } + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { +execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) + } + + override def explainSource(): String = { +if (filterPredicates.nonEmpty) { + s"filter=[${filterPredicates.reduce((l, r) => And(l, r)).toString}]" +} else { + "" +} + } + + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) + + override def applyPredicate(predicates: JList[Expression]): TableSource[Row] = { +val newSource = new TestFilterableTableSource(recordNum) +newSource.filterPushedDown = true + +val iterator = predicates.iterator() +while (iterator.hasNext) { + iterator.next() match { +case expr: BinaryComparison => + (expr.left, expr.right) match { +case (f: ResolvedFieldReference, v: Literal) if f.name.equals("amount") => + newSource.filterPredicates += expr + newSource.filterValues += v.value.asInstanceOf[Number].intValue() + iterator.remove() +case (_, _) => + } + } +} + +newSource + } + + override def isFilterPushedDown: Boolean = filterPushedDown + + private def generateDynamicCollection(): Seq[Row] = { +Preconditions.checkArgument(filterPredicates.length == filterValues.length) + +for { + cnt <- 0 until recordNum + if shouldCreateRow(cnt) +} yield { + val row = new Row(fieldNames.length) --- End diff -- can
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928403#comment-15928403 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106451995 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala --- @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table + +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.sources.{CsvTableSource, TableSource} +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.expressions.utils._ +import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource} +import org.junit.{Assert, Test} + +class TableSourceTest extends TableTestBase { + + private val projectedFields: Array[String] = Array("last", "id", "score") + private val noCalcFields: Array[String] = Array("id", "score", "first") + + // batch plan + + @Test + def testBatchProjectableSourceScanPlanTableApi(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('last.upperCase(), 'id.floor(), 'score * 2) + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, projectedFields), + term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2") +) + +util.verifyTable(result, expected) + } + + @Test + def testBatchProjectableSourceScanPlanSQL(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() + +util.tEnv.registerTableSource(tableName, tableSource) + +val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName" + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, projectedFields), + term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2") +) + +util.verifySql(sqlQuery, expected) + } + + @Test + def testBatchProjectableSourceScanNoIdentityCalc(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('id, 'score, 'first) + +val expected = batchSourceTableNode(tableName, noCalcFields) +util.verifyTable(result, expected) + } + + @Test + def testBatchFilterableWithoutPushDown(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv +.scan(tableName) +.select('price, 'id, 'amount) +.where("price * 2 < 32") + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode( +tableName, +Array("name", "id", "amount", "price")), + term("select", "price", "id", "amount"), + term("where", "<(*(price, 2), 32)") +) + +util.verifyTable(result, expected) + } + + @Test + def testBatchFilterablePartialPushDown(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .where("amount > 2
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928398#comment-15928398 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106467291 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.Types._ +import org.apache.flink.table.expressions._ +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource, TableSource} +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.tools.nsc.interpreter.JList + +/** + * This source can only handle simple comparision with field "amount". + * Supports ">, <, >=, <=, =, <>" with an integer. + */ +class TestFilterableTableSource( +val recordNum: Int = 33) +extends BatchTableSource[Row] +with StreamTableSource[Row] +with FilterableTableSource[Row] { + + var filterPushedDown: Boolean = false + + val fieldNames: Array[String] = Array("name", "id", "amount", "price") + + val fieldTypes: Array[TypeInformation[_]] = Array(STRING, LONG, INT, DOUBLE) + + // all predicates with filed "amount" --- End diff -- filed -> field > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928397#comment-15928397 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106445598 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase { filterableSource: FilterableTableSource[_], description: String): Unit = { -if (filterableSource.isFilterPushedDown) { - // The rule can get triggered again due to the transformed "scan => filter" - // sequence created by the earlier execution of this rule when we could not - // push all the conditions into the scan - return -} +Preconditions.checkArgument(!filterableSource.isFilterPushedDown) val program = calc.getProgram +val functionCatalog = FunctionCatalog.withBuiltIns val (predicates, unconvertedRexNodes) = RexProgramExtractor.extractConjunctiveConditions( program, call.builder().getRexBuilder, -tableSourceTable.tableEnv.getFunctionCatalog) +functionCatalog) if (predicates.isEmpty) { // no condition can be translated to expression return } -val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) -// trying to apply filter push down, set the flag to true no matter whether -// we actually push any filters down. -newTableSource.setFilterPushedDown(true) +val remainingPredicates = new util.LinkedList[Expression]() +predicates.foreach(e => remainingPredicates.add(e)) + +val newTableSource = filterableSource.applyPredicate(remainingPredicates) --- End diff -- Add a check that `remainingPredicates` is a subset of `predicates`? The table source should not touch those predicates that it cannot evaluate or add new predicates. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928401#comment-15928401 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106446820 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -20,47 +20,40 @@ package org.apache.flink.table.sources import org.apache.flink.table.expressions.Expression +import scala.tools.nsc.interpreter.JList --- End diff -- Can we make this `import java.util.{List => JList}`? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928396#comment-15928396 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106460094 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala --- @@ -16,106 +16,96 @@ * limitations under the License. */ -package org.apache.flink.table.plan.rules.util +package org.apache.flink.table.plan.util import java.math.BigDecimal +import java.util import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder} -import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR} +import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR, BOOLEAN} import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ -import org.junit.Assert.{assertArrayEquals, assertTrue} -import org.junit.{Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable -/** - * This class is responsible for testing RexProgramProjectExtractor. - */ -class RexProgramProjectExtractorTest { - private var typeFactory: JavaTypeFactory = _ - private var rexBuilder: RexBuilder = _ - private var allFieldTypes: Seq[RelDataType] = _ - private val allFieldNames = List("name", "id", "amount", "price") - - @Before - def setUp(): Unit = { -typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) -rexBuilder = new RexBuilder(typeFactory) -allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_)) - } +abstract class RexProgramTestBase { --- End diff -- good idea to make this an abstract class > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928395#comment-15928395 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106452031 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala --- @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table + +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.sources.{CsvTableSource, TableSource} +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.expressions.utils._ +import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource} +import org.junit.{Assert, Test} + +class TableSourceTest extends TableTestBase { + + private val projectedFields: Array[String] = Array("last", "id", "score") + private val noCalcFields: Array[String] = Array("id", "score", "first") + + // batch plan + + @Test + def testBatchProjectableSourceScanPlanTableApi(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('last.upperCase(), 'id.floor(), 'score * 2) + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, projectedFields), + term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2") +) + +util.verifyTable(result, expected) + } + + @Test + def testBatchProjectableSourceScanPlanSQL(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() + +util.tEnv.registerTableSource(tableName, tableSource) + +val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName" + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode(tableName, projectedFields), + term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2") +) + +util.verifySql(sqlQuery, expected) + } + + @Test + def testBatchProjectableSourceScanNoIdentityCalc(): Unit = { +val (tableSource, tableName) = csvTable +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('id, 'score, 'first) + +val expected = batchSourceTableNode(tableName, noCalcFields) +util.verifyTable(result, expected) + } + + @Test + def testBatchFilterableWithoutPushDown(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv +.scan(tableName) +.select('price, 'id, 'amount) +.where("price * 2 < 32") + +val expected = unaryNode( + "DataSetCalc", + batchSourceTableNode( +tableName, +Array("name", "id", "amount", "price")), + term("select", "price", "id", "amount"), + term("where", "<(*(price, 2), 32)") +) + +util.verifyTable(result, expected) + } + + @Test + def testBatchFilterablePartialPushDown(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .where("amount > 2
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928399#comment-15928399 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106459926 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramTestBase.scala --- @@ -16,106 +16,96 @@ * limitations under the License. */ -package org.apache.flink.table.plan.rules.util +package org.apache.flink.table.plan.util import java.math.BigDecimal +import java.util import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder} -import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR} +import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, VARCHAR, BOOLEAN} import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ -import org.junit.Assert.{assertArrayEquals, assertTrue} -import org.junit.{Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable -/** - * This class is responsible for testing RexProgramProjectExtractor. - */ -class RexProgramProjectExtractorTest { - private var typeFactory: JavaTypeFactory = _ - private var rexBuilder: RexBuilder = _ - private var allFieldTypes: Seq[RelDataType] = _ - private val allFieldNames = List("name", "id", "amount", "price") - - @Before - def setUp(): Unit = { -typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) -rexBuilder = new RexBuilder(typeFactory) -allFieldTypes = List(VARCHAR, BIGINT, INTEGER, DOUBLE).map(typeFactory.createSqlType(_)) - } +abstract class RexProgramTestBase { - @Test - def testExtractRefInputFields(): Unit = { -val usedFields = extractRefInputFields(buildRexProgram()) -assertArrayEquals(usedFields, Array(2, 3, 1)) - } + val typeFactory: JavaTypeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT) + + val allFieldNames: util.List[String] = List("name", "id", "amount", "price", "flag").asJava + + val allFieldTypes: util.List[RelDataType] = +List(VARCHAR, BIGINT, INTEGER, DOUBLE, BOOLEAN).map(typeFactory.createSqlType).asJava + + var rexBuilder: RexBuilder = new RexBuilder(typeFactory) - @Test - def testRewriteRexProgram(): Unit = { -val originRexProgram = buildRexProgram() -assertTrue(extractExprStrList(originRexProgram).sameElements(Array( - "$0", - "$1", - "$2", - "$3", - "*($t2, $t3)", - "100", - "<($t4, $t5)", - "6", - ">($t1, $t7)", - "AND($t6, $t8)"))) -// use amount, id, price fields to create a new RexProgram -val usedFields = Array(2, 3, 1) -val types = usedFields.map(allFieldTypes(_)).toList.asJava -val names = usedFields.map(allFieldNames(_)).toList.asJava -val inputRowType = typeFactory.createStructType(types, names) -val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, usedFields, rexBuilder) -assertTrue(extractExprStrList(newRexProgram).sameElements(Array( - "$0", - "$1", - "$2", - "*($t0, $t1)", - "100", - "<($t3, $t4)", - "6", - ">($t2, $t6)", - "AND($t5, $t7)"))) + /** +* extract all expression string list from input RexProgram expression lists +* +* @param rexProgram input RexProgram instance to analyze +* @return all expression string list of input RexProgram expression lists +*/ + protected def extractExprStrList(rexProgram: RexProgram): mutable.Buffer[String] = { +rexProgram.getExprList.asScala.map(_.toString) } - private def buildRexProgram(): RexProgram = { -val types = allFieldTypes.asJava -val names = allFieldNames.asJava -val inputRowType = typeFactory.createStructType(types, names) + // select amount, amount * price as total where amount * price < 100 and id > 6 + protected def buildSimpleRexProgram1(): RexProgram = { +val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames) val builder = new RexProgramBuilder(inputRowType, rexBuilder) -val t0 = rexBuilder.makeInputRef(types.get(2), 2) -val t1 = rexBuilder.makeInputRef(types.get(1), 1)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928191#comment-15928191 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3520 OK, I thought about this. Let's stick to your mutable list approach. As you said it is safe, i.e., in worst case the table source does not remove expressions and we'll have a bit of overhead due to the unnecessary filters. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15928012#comment-15928012 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3520 Sure, a `FilterableTableSource` can implement its accepted predicates as it likes. However, I think we could expect that `getPredicates()` returns the accepted filter expressions as it got them. This makes it easier to check a correct behavior of the `FilterableTableSource`. Internally, the accepted expressions can be decomposed or otherwise changed. Similarly, we would need to check that the Expressions which remain in the mutable list are also present in the input list. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927919#comment-15927919 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Hi @fhueske, you just pointed out a question i had when tonycox implementing the first version. Why we are preventing `FilterableTableSource` from modifying the expressions? I think it's totally up to them whether changing the expressions they took or keep them just they were. We have not reason to restrict the `FilterableTableSource`'s behavior if they just do the things right. Pass in a java list and told user who extending this to pick and remove the expressions the support is not super nice. But even if user just pick expressions but not remove them, we still get the correct answer. What do you think? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927841#comment-15927841 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3520 Thanks for the update @KurtYoung! I think you have a good point about not extending `TableSourceScan`. The filter pushdown flag might not be the only one. Let's keep this information in `FilterableTableScan`. I'm not so sure about the interface of `applyPredicate(predicates: JList[Expression])`. I think we should not ask users to remove the accepted Expressions from the `predicates` list. How about we add a method `getPredicates(): JList[Expression]` which returns the accepted expressions. This method could also replace `isFilterPushedDown` if we expect `null` if `getPredicates()` was not called and an empty list if it was called but no predicate was accepted. IMO, `getPredicates()` has also the advantage that we can check that the `FilterableTableSource` did not modify the accepted predicates. What do you think? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927672#comment-15927672 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106362409 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -42,63 +41,40 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, -private[flink] val calcProgram: RexProgram, // for tests +calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) --- End diff -- OK, sounds good to me. I was just curious, because I remembered we had some issues when `DataSetCalc` extended `Calc`. If everything works fine, I'm OK with this change. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927520#comment-15927520 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 Hi @fhueske @godfreyhe, thanks for the review, i addressed most all your comments. @fhueske Except for letting `TableSourceScan` be aware of whether filter has been pushed down. I'm not sure to let the `TableSourceScan` has this kind of information, i'd prefer to let them stay within the all kinds of `TableSource`. One drawback to let `TableSourceScan` has such kind of information is when we do the `TableSourceScan` copy, we need to take care all these information, make sure they also be copied correctly. In the future, if we add more extension to `TableSource` like we can push part of query down, we will face this problem. Regarding to the interface of `FilterableTableSource`, i agree with you that the trait containing some logic is not friendly with java extensions. So i removed the default implementation of `isFilterPushedDown`, the inherited class should take care of this method. And regarding the `Tuple2` thing, how about we pass in a mutable java list, and let table source to *pick out* expression from it and return a copy of table source which contains these pushed down predicates. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927457#comment-15927457 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335718 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter records before returning. + */ +trait FilterableTableSource[T] extends TableSource[T] { --- End diff -- Changed. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927456#comment-15927456 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927453#comment-15927453 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106335660 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927379#comment-15927379 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( + operand(classOf[DataSetCalc], +operand(classOf[BatchTableSourceScan], none)), + "PushFilterIntoBatchTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Will add > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927380#comment-15927380 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329225 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Will add > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927377#comment-15927377 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329079 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { --- End diff -- Will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927378#comment-15927378 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106329099 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927371#comment-15927371 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106328751 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala --- @@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable[T]( val tableSource: TableSource[T], +val tableEnv: TableEnvironment, --- End diff -- Yes, you are right, especially that UDF is currently registered as objects but not classes, it's really impossible to let TableSource supporting this. I will remove this filed and only use built-in functions when extracting expression form RexProgram. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927353#comment-15927353 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106327707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { + // The rule can get triggered again due to the transformed "scan => filter" + // sequence created by the earlier execution of this rule when we could not + // push all the conditions into the scan + return +} + +val program = calc.getProgram +val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +call.builder().getRexBuilder, +tableSourceTable.tableEnv.getFunctionCatalog) +if (predicates.isEmpty) { + // no condition can be translated to expression + return +} + +val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) +// trying to apply filter push down, set the flag to true no matter whether +// we actually push any filters down. +newTableSource.setFilterPushedDown(true) + +// check whether framework still need to do a filter +val relBuilder = call.builder() +val remainingCondition = { + if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) { +relBuilder.push(scan) +(remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ unconvertedRexNodes) +.reduce((l, r) => relBuilder.and(l, r)) + } else { +null + } +} + +// check whether we still need a RexProgram. An RexProgram is needed when either +// projection or filter exists. +val newScan = scan.copy(scan.getTraitSet, newTableSource) +val newRexProgram = { + if (remainingCondition != null || program.getProjectList.size() > 0) { --- End diff -- Thanks for the tips, will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927319#comment-15927319 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106324883 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -58,16 +60,24 @@ class BatchTableSourceScan( ) } + override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = { +new BatchTableSourceScan( + cluster, + traitSet, + getTable, + newTableSource.asInstanceOf[BatchTableSource[_]] +) + } + override def explainTerms(pw: RelWriter): RelWriter = { -super.explainTerms(pw) +val terms = super.explainTerms(pw) .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) +tableSource.explainTerms(terms) --- End diff -- will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927318#comment-15927318 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106324868 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -39,4 +39,6 @@ trait TableSource[T] { /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] + /** Describes the table source */ + def explainTerms(pw: RelWriter): RelWriter = pw --- End diff -- Make sense to me, will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927290#comment-15927290 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106322568 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -42,63 +41,40 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, -private[flink] val calcProgram: RexProgram, // for tests +calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) --- End diff -- This is because i want to unify the PushFilterIntoScan rule's code for both batch and stream mode. During executing the rule, we may need to create a new copy of the DataSetCalc or DataStreamCalc. It make things more easier to let these two classes inherit from `Calc`, and use `Calc.copy` to create a new copied instance. I do encountered some problem after i changed the hierarchy, some unit tests failed because of the plan changed. But it's because we don't calculate the cost for Calc right. I added some logic to `CommanCalc.computeSelfCost`, and everything works fine. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927277#comment-15927277 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106321885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala --- @@ -24,7 +24,7 @@ package org.apache.flink.table.sources * * @tparam T The return type of the [[ProjectableTableSource]]. */ -trait ProjectableTableSource[T] { +trait ProjectableTableSource[T] extends TableSource[T] { --- End diff -- It's because i want to unify the PushProjectIntoScan rule codes for both batch and stream mode. And once we push down project into table source, we not only should create a new TableScan instance, but also a new TableSource instance. The codes are like: ``` val newTableSource = originTableSource.projectFields(usedFields) // create a new scan with the new TableSource instance val newScan = scan.copy(scan.getTraitSet, newTableSource) ``` At first the `projectFields` method returned `ProjectableTableSource` which is not a `TableSource`, so i let `ProjectableTableSource` inherit from `TableSource`. But i just noticed we can just let `projectFields` return one `TableSource`, and problem resolved. Will change this. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926147#comment-15926147 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106140096 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala --- @@ -24,7 +24,7 @@ package org.apache.flink.table.sources * * @tparam T The return type of the [[ProjectableTableSource]]. */ -trait ProjectableTableSource[T] { +trait ProjectableTableSource[T] extends TableSource[T] { --- End diff -- Why is this change necessary? A `ProjectableTableSource` always needs to implement either `BatchTableSource` or `StreamTableSource` (or both) which already implement `TableSource`. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926158#comment-15926158 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106135951 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter records before returning. + */ +trait FilterableTableSource[T] extends TableSource[T] { + + /** +* Indicates whether the filter push down has been applied. Note that even if we don't +* actually push down any filters, we should also set this flag to true after the trying. +*/ + private var filterPushedDown: Boolean = false + + /** +* Check and pick all predicates this table source can support. The passed in predicates +* have been translated in conjunctive form, and table source can only pick those predicates +* that it supports. +* +* After trying to push predicates down, we should return a new [[FilterableTableSource]] +* instance which holds all pushed down predicates. Even if we actually pushed nothing down, +* it is recommended that we still return a new [[FilterableTableSource]] instance since we will +* mark the returned instance as filter push down has been tried. Also we need to return all +* unsupported predicates back to the framework to do further filtering. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element in the +* array. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param predicate An array contains conjunctive predicates. +* @return A new cloned instance of [[FilterableTableSource]] as well as n array of Expression +* which contains all unsupported predicates. +*/ + def applyPredicate(predicate: Array[Expression]): (FilterableTableSource[_], Array[Expression]) --- End diff -- Maybe should design this as: ``` // returns a copy of the table source with pushed down predicates applyPredicate(predicate: Array[Expression]): FilterableTableSource[_] // returns pushed down predicates getPredicates(): Array[Expression] ``` > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926146#comment-15926146 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106031866 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -58,16 +60,24 @@ class BatchTableSourceScan( ) } + override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = { +new BatchTableSourceScan( + cluster, + traitSet, + getTable, + newTableSource.asInstanceOf[BatchTableSource[_]] +) + } + override def explainTerms(pw: RelWriter): RelWriter = { -super.explainTerms(pw) +val terms = super.explainTerms(pw) .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) +tableSource.explainTerms(terms) --- End diff -- Insert string provided by `TableSource.toString()` (or a dedicated method that returns an explain string) instead of calling `explainTerms()`? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926153#comment-15926153 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106137700 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( + operand(classOf[DataSetCalc], +operand(classOf[BatchTableSourceScan], none)), + "PushFilterIntoBatchTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Check also if we already tried to push a filter down > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926149#comment-15926149 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106159736 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926157#comment-15926157 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106135176 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter records before returning. + */ +trait FilterableTableSource[T] extends TableSource[T] { + + /** +* Indicates whether the filter push down has been applied. Note that even if we don't +* actually push down any filters, we should also set this flag to true after the trying. +*/ + private var filterPushedDown: Boolean = false + + /** +* Check and pick all predicates this table source can support. The passed in predicates +* have been translated in conjunctive form, and table source can only pick those predicates +* that it supports. +* +* After trying to push predicates down, we should return a new [[FilterableTableSource]] +* instance which holds all pushed down predicates. Even if we actually pushed nothing down, +* it is recommended that we still return a new [[FilterableTableSource]] instance since we will +* mark the returned instance as filter push down has been tried. Also we need to return all +* unsupported predicates back to the framework to do further filtering. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element in the +* array. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param predicate An array contains conjunctive predicates. +* @return A new cloned instance of [[FilterableTableSource]] as well as n array of Expression +* which contains all unsupported predicates. +*/ + def applyPredicate(predicate: Array[Expression]): (FilterableTableSource[_], Array[Expression]) --- End diff -- We should not use Scala classes (`Tuple2`) here to make the interface better compatible with Java. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926150#comment-15926150 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106123397 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926155#comment-15926155 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106159389 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926159#comment-15926159 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106136200 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { + // The rule can get triggered again due to the transformed "scan => filter" + // sequence created by the earlier execution of this rule when we could not + // push all the conditions into the scan + return +} + +val program = calc.getProgram +val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +call.builder().getRexBuilder, +tableSourceTable.tableEnv.getFunctionCatalog) +if (predicates.isEmpty) { + // no condition can be translated to expression + return +} + +val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) +// trying to apply filter push down, set the flag to true no matter whether +// we actually push any filters down. +newTableSource.setFilterPushedDown(true) --- End diff -- I think we can set this flag also on `TableSourceScan`. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926151#comment-15926151 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106034668 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { --- End diff -- I think we should check on the `TableSourceScan` whether we tried to push a filter down. This check should be done in the `matches()` method and not in `onMatch()`. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926144#comment-15926144 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106137246 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.TableSourceScan +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { + // The rule can get triggered again due to the transformed "scan => filter" + // sequence created by the earlier execution of this rule when we could not + // push all the conditions into the scan + return +} + +val program = calc.getProgram +val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +call.builder().getRexBuilder, +tableSourceTable.tableEnv.getFunctionCatalog) +if (predicates.isEmpty) { + // no condition can be translated to expression + return +} + +val (newTableSource, remainingPredicates) = filterableSource.applyPredicate(predicates) +// trying to apply filter push down, set the flag to true no matter whether +// we actually push any filters down. +newTableSource.setFilterPushedDown(true) + +// check whether framework still need to do a filter +val relBuilder = call.builder() +val remainingCondition = { + if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) { +relBuilder.push(scan) +(remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ unconvertedRexNodes) +.reduce((l, r) => relBuilder.and(l, r)) + } else { +null + } +} + +// check whether we still need a RexProgram. An RexProgram is needed when either +// projection or filter exists. +val newScan = scan.copy(scan.getTraitSet, newTableSource) +val newRexProgram = { + if (remainingCondition != null || program.getProjectList.size() > 0) { --- End diff -- `program.getProjectList.size() > 0` should be `program.projectsOnlyIdentity()`. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926152#comment-15926152 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106137875 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") + with PushFilterIntoTableSourceScanRuleBase { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource[_] => +calc.getProgram.getCondition != null --- End diff -- Check if we already tried to push a filter down. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926156#comment-15926156 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106134112 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter records before returning. + */ +trait FilterableTableSource[T] extends TableSource[T] { + + /** +* Indicates whether the filter push down has been applied. Note that even if we don't +* actually push down any filters, we should also set this flag to true after the trying. +*/ + private var filterPushedDown: Boolean = false + + /** +* Check and pick all predicates this table source can support. The passed in predicates +* have been translated in conjunctive form, and table source can only pick those predicates +* that it supports. +* +* After trying to push predicates down, we should return a new [[FilterableTableSource]] +* instance which holds all pushed down predicates. Even if we actually pushed nothing down, +* it is recommended that we still return a new [[FilterableTableSource]] instance since we will +* mark the returned instance as filter push down has been tried. Also we need to return all +* unsupported predicates back to the framework to do further filtering. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element in the +* array. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param predicate An array contains conjunctive predicates. +* @return A new cloned instance of [[FilterableTableSource]] as well as n array of Expression +* which contains all unsupported predicates. +*/ + def applyPredicate(predicate: Array[Expression]): (FilterableTableSource[_], Array[Expression]) + + /** +* Return the flag to indicate whether filter push down has been tried. +*/ + def isFilterPushedDown: Boolean = filterPushedDown --- End diff -- I'm not sure about the `isFilterPushedDown` and `setFilterPushDown` methods. They seem to unnecessarily blow up the interface (at least for Java classes that implement the trait). Can't we track in `TableSourceScan` if we tried to push a filter down? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926154#comment-15926154 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106140654 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter records before returning. + */ +trait FilterableTableSource[T] extends TableSource[T] { --- End diff -- Why do we need to extend `TableSource`? (see also comment on `ProjectableTableSource`) > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926148#comment-15926148 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106032363 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -42,63 +41,40 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, -private[flink] val calcProgram: RexProgram, // for tests +calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) --- End diff -- Why are you changing this? I think we had this at some point in time, but changed `Calc` to `SingleRel`. Can't remember why though. Maybe @twalthr knows why. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926145#comment-15926145 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106123284 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala --- @@ -25,6 +25,7 @@ import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ class TableSourceTable[T]( val tableSource: TableSource[T], +val tableEnv: TableEnvironment, --- End diff -- Do we really need this reference here? I think it is only needs to provide the FunctionCatalog for translating RexNodes into Expressions for filter pushdown. Isn't the catalog of built-in functions sufficient for that (which is available as a static object)? I don't think a TableSource would be able to evaluate a predicate that includes a UDF, so the built-in functions should be enough and we do not need to add the `TableEnvironment` here. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926143#comment-15926143 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r105726756 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala --- @@ -39,4 +39,6 @@ trait TableSource[T] { /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] + /** Describes the table source */ + def explainTerms(pw: RelWriter): RelWriter = pw --- End diff -- Do we need this method? So far we tried to keep Calcite out of the TableSource interface. Can we change this to `explainSource(): String` and set the String in `BatchTableSourceScan` and `StreamTableSourceScan`? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926032#comment-15926032 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106074574 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -19,31 +19,34 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.TableSourceScan import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.sources.BatchTableSource +import org.apache.flink.table.sources.{BatchTableSource, TableSource} import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, -val tableSource: BatchTableSource[_]) - extends BatchScan(cluster, traitSet, table) { +tableSource: BatchTableSource[_]) + extends TableSourceScan(cluster, traitSet, table, tableSource) + with BatchScan { - override def deriveRowType() = { + override def deriveRowType(): RelDataType = { --- End diff -- Move this method to `TableSourceScan`. The behavior is same between `TableSourceScan`'s sub-class. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926033#comment-15926033 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106100087 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -57,17 +60,24 @@ class BatchTableSourceScan( ) } + override def copy(traitSet: RelTraitSet, newTableSource: TableSource[_]): TableSourceScan = { +new BatchTableSourceScan( + cluster, + traitSet, + getTable, + newTableSource.asInstanceOf[BatchTableSource[_]] --- End diff -- `TableSource` instance in `TableSourceTable` which can be get by `getTable.unwrap(classOf[TableSourceTable[_]])` has not changed never, and it may be different from newTableSource. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926034#comment-15926034 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106149747 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -34,14 +34,24 @@ trait FilterableTableSource { /** * Check and pick all predicates this table source can support. The passed in predicates -* have been translated in conjunctive form, and table source and only pick those predicates -* that it supports. All unsupported predicates should be give back to the framework to do -* further filtering. +* have been translated in conjunctive form, and table source can only pick those predicates +* that it supports. +* +* After trying to push predicates down, we should return a new [[FilterableTableSource]] +* instance which holds all pushed down predicates. Even if we actually pushed nothing down, +* it is recommended that we still return a new [[FilterableTableSource]] instance since we will +* mark the returned instance as filter push down has been tried. Also we need to return all +* unsupported predicates back to the framework to do further filtering. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element in the +* array. Don't try to reorganize the predicates if you are absolutely confident with that. * * @param predicate An array contains conjunctive predicates. -* @return An array contains all unsupported predicates. +* @return A new cloned instance of [[FilterableTableSource]] as well as n array of Expression +* which contains all unsupported predicates. */ - def applyPredicate(predicate: Array[Expression]): Array[Expression] + def applyPredicate(predicate: Array[Expression]): (FilterableTableSource[_], Array[Expression]) --- End diff -- return `FilterableTableSource[T]` instead of `FilterableTableSource[_]`, otherwise in java, table source instance must be cast to `FilterableTableSource` > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925442#comment-15925442 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106072764 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() --- End diff -- `var` → `val` > Add FilterableTableSource interface and translation rule >
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925441#comment-15925441 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106073030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef):
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923710#comment-15923710 ] ASF GitHub Bot commented on FLINK-3849: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3520 To address the problem of not reusing `TableSource` when we create a new scan for table source, i changed some inheritance for current `BatchScan`, `StreamScan`, `BatchTableSourceScan`, `StreamTableSourceScan` and so on. The new structure is moe likely as the relationship between `FlinkTable` and `TableSourceTable`, `DataSetTable`, `DataStreamTable`. After changing the structure, it also make it possible to unify the push project into scan rule for both batch and stream mode. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923433#comment-15923433 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r105812230 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.{Calc, TableScan} +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.dataset.DataSetCalc +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource, + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { + // The rule can get triggered again due to the transformed "scan => filter" + // sequence created by the earlier execution of this rule when we could not + // push all the conditions into the scan + return +} + +val program = calc.getProgram +val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( +program, +call.builder().getRexBuilder, +tableSourceTable.tableEnv.getFunctionCatalog) +if (predicates.isEmpty) { + // no condition can be translated to expression + return +} + +// trying to apply filter push down, set the flag to true no matter whether +// we actually push any filters down. +filterableSource.setFilterPushedDown(true) +val remainingPredicates = filterableSource.applyPredicate(predicates) + +// check whether framework still need to do a filter +val relBuilder = call.builder() +val remainingCondition = { + if (remainingPredicates.length > 0 || unconvertedRexNodes.length > 0) { --- End diff -- nonEmpty is better than length > 0, i think > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923432#comment-15923432 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r105811515 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -42,63 +41,40 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, -private[flink] val calcProgram: RexProgram, // for tests +calcProgram: RexProgram, ruleDescription: String) - extends SingleRel(cluster, traitSet, input) + extends Calc(cluster, traitSet, input, calcProgram) with CommonCalc with DataSetRel { - override def deriveRowType() = rowRelDataType + override def deriveRowType(): RelDataType = rowRelDataType - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { -new DataSetCalc( - cluster, - traitSet, - inputs.get(0), - getRowType, - calcProgram, - ruleDescription) + override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { +new DataSetCalc(cluster, traitSet, child, getRowType, program, ruleDescription) } override def toString: String = calcToString(calcProgram, getExpressionString) override def explainTerms(pw: RelWriter): RelWriter = { -super.explainTerms(pw) - .item("select", selectionToString(calcProgram, getExpressionString)) - .itemIf("where", -conditionToString(calcProgram, getExpressionString), -calcProgram.getCondition != null) +pw.input("input", getInput) +.item("select", selectionToString(calcProgram, getExpressionString)) --- End diff -- two spaces for indention > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923434#comment-15923434 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r105814436 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRuleCall +import org.apache.calcite.rel.core.{Calc, TableScan} +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.dataset.DataSetCalc +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.sources.FilterableTableSource + +trait PushFilterIntoTableSourceScanRuleBase { + + private[flink] def pushFilterIntoScan( + call: RelOptRuleCall, + calc: Calc, + scan: TableScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource, + description: String): Unit = { + +if (filterableSource.isFilterPushedDown) { --- End diff -- tableSource should do not share between TableScan instances > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15906984#comment-15906984 ] ASF GitHub Bot commented on FLINK-3849: --- GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3520 [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it This PR is based on #3166 , and added following changes: 1. Refactor `RexProgramExpressionExtractor` and `RexProgramExpressionExtractor` to `RexProgramExtractor` and `RexProgramRewriter`. `RexProgramExtractor` is responsible for retract either projection expressions or filter expression. 2. Make sure we don't fail during extracting and converting filter RexNodes to expressions. The expressions which successfully translated and unconverted RexNodes will both be returned. 3. Add some tests for `RexProgramExtractor`. 4. Provide unified `PushFilterIntoTableSourceScanRuleBase` to support filter push down in both batch and stream mode. 5. Add some logical tests for filter push down in different situations, like fully push down and partial push down. 5. Slight change of testing class `TestFilterableTableSource` to make it less specialized. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-3849 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3520.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3520 commit 0a7af41509d9a0db3e791cb9f4dc5a1a8086f0b2 Author: tonycoxDate: 2017-01-11T09:15:49Z [FLINK-3849] Add FilterableTableSource interface and Rules for pushing it commit 549b4e00e68d32f070e196fc6eb9a7f5f9e937c3 Author: tonycox Date: 2017-01-31T12:41:52Z fix filterable test commit 9aa82062832e0aabcb003e582c8130aeecc91a73 Author: tonycox Date: 2017-02-16T13:32:33Z rebase and trying fix rexnode parsing commit 646a6931224c7dcc58501ec014ab675925bb105d Author: tonycox Date: 2017-02-17T16:48:40Z create wrapper and update rules commit abfa38d894aa86b7a5c91dd29bf398b880c8bfe7 Author: Kurt Young Date: 2017-03-13T07:30:13Z [FLINK-3849] [table] Add FilterableTableSource interface and rules for pushing it > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Kurt Young > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902807#comment-15902807 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3166 OK, thanks for the quick response! I'll point the contributor of the partition pruning to this PR. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902799#comment-15902799 ] ASF GitHub Bot commented on FLINK-3849: --- Github user tonycox commented on the issue: https://github.com/apache/flink/pull/3166 Hi @fhueske I cant continue on this PR, have not enough time for now. If you need implementation of it immediately I will unassign > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882371#comment-15882371 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102912230 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicates = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicates.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicates) --- End diff -- Right, but only if it does not do any projection. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882058#comment-15882058 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102886003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicates = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicates.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicates) --- End diff -- if remainingPredicate is empty, we should remove calc node also. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878663#comment-15878663 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102471926 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -54,20 +55,27 @@ class BatchTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + filterCondition ) } override def explainTerms(pw: RelWriter): RelWriter = { -super.explainTerms(pw) +val terms = super.explainTerms(pw) .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) + if (filterCondition != null) { +import scala.collection.JavaConverters._ --- End diff -- Please move the import up > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878670#comment-15878670 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102497345 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicates = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicates.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicates) --- End diff -- Do not continue if `remainingPredicate == predicates` > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878674#comment-15878674 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102486920 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( + operand(classOf[DataSetCalc], +operand(classOf[BatchTableSourceScan], none)), + "PushFilterIntoBatchTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program: RexProgram = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicate = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicate.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicate) + + if (verifyExpressions(predicate, remainingPredicate)) { --- End diff -- The `FilterableTableSource` violates the method contract if this is not true. I would log a WARN message here. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878660#comment-15878660 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102473754 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex._ +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper} +import org.apache.flink.table.expressions._ +import org.apache.flink.table.validate.FunctionCatalog + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.immutable.IndexedSeq + +object RexProgramExpressionExtractor { --- End diff -- Rename to `RexProgramPredicateExtractor`? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878667#comment-15878667 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102506293 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractorTest.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import java.math.BigDecimal + +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.plan._ +import org.apache.calcite.plan.volcano.VolcanoPlanner +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder} +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import org.apache.flink.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.CompositeRelDataType +import org.apache.flink.table.utils.CommonTestData +import org.junit.Test +import org.junit.Assert._ + +import scala.collection.JavaConverters._ + +class RexProgramExpressionExtractorTest { --- End diff -- I think it would be good to add a few more corner cases to the tests such as unsupported RexNodes or functions, single predicates, etc. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878675#comment-15878675 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102484272 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex._ +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper} +import org.apache.flink.table.expressions._ +import org.apache.flink.table.validate.FunctionCatalog + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.immutable.IndexedSeq + +object RexProgramExpressionExtractor { + + /** +* converts a rexProgram condition into independent CNF expressions +* +* @param rexProgram The RexProgram to analyze +* @return converted expression +*/ + private[flink] def extractPredicateExpressions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): Array[Expression] = { + +val fieldNames = getInputsWithNames(rexProgram) + +val condition = rexProgram.getCondition +if (condition == null) { + return Array.empty +} +val call = rexProgram.expandLocalRef(condition) +val cnf = RexUtil.toCnf(rexBuilder, call) +val conjunctions = RelOptUtil.conjunctions(cnf) +val expressions = conjunctions.asScala.map( + RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames) +) +expressions.toArray + } + + /** +* verify should we apply remained expressions on --- End diff -- Complete comment > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878671#comment-15878671 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102473702 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex._ +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper} +import org.apache.flink.table.expressions._ +import org.apache.flink.table.validate.FunctionCatalog + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.immutable.IndexedSeq + +object RexProgramExpressionExtractor { + + /** +* converts a rexProgram condition into independent CNF expressions +* +* @param rexProgram The RexProgram to analyze +* @return converted expression +*/ + private[flink] def extractPredicateExpressions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): Array[Expression] = { + +val fieldNames = getInputsWithNames(rexProgram) + +val condition = rexProgram.getCondition +if (condition == null) { + return Array.empty +} +val call = rexProgram.expandLocalRef(condition) --- End diff -- Please add a few comments > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878669#comment-15878669 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102492031 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter the fields of the return table. + * + */ +trait FilterableTableSource { + + /** return an predicate expression that was set. */ + def getPredicate: Array[Expression] + + /** +* @param predicate a filter expression that will be applied to fields to return. --- End diff -- The method docs should be more extensive. We have to explain that the expressions in the array are conjunctive terms which have to be accepted completely or not at all. All non-accepted terms have to be returned unmodified. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878672#comment-15878672 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102485241 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex._ +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper} +import org.apache.flink.table.expressions._ +import org.apache.flink.table.validate.FunctionCatalog + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.immutable.IndexedSeq + +object RexProgramExpressionExtractor { + + /** +* converts a rexProgram condition into independent CNF expressions +* +* @param rexProgram The RexProgram to analyze +* @return converted expression +*/ + private[flink] def extractPredicateExpressions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): Array[Expression] = { --- End diff -- We should also return those `RexNodes` that we cannot translate, i.e., return (Array[Expression], Array[RexNode]) > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878668#comment-15878668 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102491504 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex._ +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper} +import org.apache.flink.table.expressions._ +import org.apache.flink.table.validate.FunctionCatalog + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.immutable.IndexedSeq + +object RexProgramExpressionExtractor { + + /** +* converts a rexProgram condition into independent CNF expressions +* +* @param rexProgram The RexProgram to analyze +* @return converted expression +*/ + private[flink] def extractPredicateExpressions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): Array[Expression] = { + +val fieldNames = getInputsWithNames(rexProgram) + +val condition = rexProgram.getCondition +if (condition == null) { + return Array.empty +} +val call = rexProgram.expandLocalRef(condition) +val cnf = RexUtil.toCnf(rexBuilder, call) +val conjunctions = RelOptUtil.conjunctions(cnf) +val expressions = conjunctions.asScala.map( + RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames) +) +expressions.toArray + } + + /** +* verify should we apply remained expressions on +* +* @param original initial expression +* @param remained remained part of original expression +* @return whether or not to decouple parts of the origin expression +*/ + private[flink] def verifyExpressions( + original: Array[Expression], + remained: Array[Expression]): Boolean = +remained forall (original contains) + + /** +* Generates a new RexProgram based on new expression. +* +* @param rexProgram original RexProgram +* @param scan input source +* @param predicate filter condition (fields must be resolved) +* @param relBuilder builder for converting expression to Rex +*/ + private[flink] def rewriteRexProgram( + rexProgram: RexProgram, + scan: TableScan, + predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = { + +relBuilder.push(scan) + +val inType = rexProgram.getInputRowType +val resolvedExps = resolveFields(predicate, inType) +val projs = rexProgram.getProjectList.map(rexProgram.expandLocalRef) + +RexProgram.create( + inType, + projs, + conjunct(resolvedExps).get.toRexNode, + rexProgram.getOutputRowType, + relBuilder.getRexBuilder) + } + + private[flink] def getFilterExpressionAsRexNode( + inputTpe: RelDataType, + scan: TableScan, + exps: Array[Expression])(implicit relBuilder: RelBuilder): RexNode = { +relBuilder.push(scan) +val resolvedExps = resolveFields(exps, inputTpe) +val fullExp = conjunct(resolvedExps) +if (fullExp.isDefined) { + fullExp.get.toRexNode +} else { + null +} + } + +
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878662#comment-15878662 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102507787 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala --- @@ -126,21 +156,49 @@ class TableSourceTest extends TableTestBase { @Test def testStreamProjectableSourceScanNoIdentityCalc(): Unit = { -val (csvTable, tableName) = tableSource +val (tableSource, tableName) = csvTable val util = streamTestUtil() val tEnv = util.tEnv -tEnv.registerTableSource(tableName, csvTable) +tEnv.registerTableSource(tableName, tableSource) val result = tEnv .scan(tableName) .select('id, 'score, 'first) -val expected = sourceStreamTableNode(tableName, noCalcFields) +val expected = projectableSourceStreamTableNode(tableName, noCalcFields) util.verifyTable(result, expected) } @Test + def testStreamFilterableSourceScanPlanTableApi(): Unit = { +val (tableSource, tableName) = filterableTableSource +val util = streamTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource(tableName, tableSource) + +val result = tEnv + .scan(tableName) + .select('price, 'id, 'amount) + .where("amount > 2 && price * 2 < 32") --- End diff -- An example for an unsupported predicate would be `'id.cast(BasicTypeInfo.STRING_TYPE_INFO) === "abc"`. This throws and exception when translating it to an `Expression`. As said before, unsupported expressions should be gracefully handled by not failing but instead by not offering this `RexNode` to the `FilterableTableSource` and evaluating it in the `DataSetCalc`. I would suggest to use `CAST` as an example to implement the graceful handling and adding support for it once the failure-free translation works. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878664#comment-15878664 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102476247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import org.apache.calcite.rex._ +import org.apache.calcite.sql._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.table.calcite.RexNodeWrapper._ + +abstract class RexNodeWrapper(rex: RexNode) { + def get: RexNode = rex + def toExpression(names: Map[RexInputRef, String]): Expression +} + +case class RexLiteralWrapper(literal: RexLiteral) extends RexNodeWrapper(literal) { + override def toExpression(names: Map[RexInputRef, String]): Expression = { +val typeInfo = FlinkTypeFactory.toTypeInfo(literal.getType) +Literal(literal.getValue, typeInfo) + } +} + +case class RexInputWrapper(input: RexInputRef) extends RexNodeWrapper(input) { + override def toExpression(names: Map[RexInputRef, String]): Expression = { +val typeInfo = FlinkTypeFactory.toTypeInfo(input.getType) +ResolvedFieldReference(names(input), typeInfo) + } +} + +case class RexCallWrapper( +call: RexCall, +operands: Seq[RexNodeWrapper]) extends RexNodeWrapper(call) { + + override def toExpression(names: Map[RexInputRef, String]): Expression = { +val ops = operands.map(_.toExpression(names)) +call.op match { + case function: SqlFunction => +lookupFunction(replace(function.getName), ops) + case postfix: SqlPostfixOperator => +lookupFunction(replace(postfix.getName), ops) + case operator@_ => +val name = replace(s"${operator.kind}") +lookupFunction(name, ops) +} + } + + def replace(str: String): String = { +str.replaceAll("\\s|_", "") + } +} + +object RexNodeWrapper { + + private var catalog: Option[FunctionCatalog] = None + + def wrap(rex: RexNode, functionCatalog: FunctionCatalog): RexNodeWrapper = { +catalog = Option(functionCatalog) +rex.accept(new WrapperVisitor) + } + + private[table] def lookupFunction(name: String, operands: Seq[Expression]): Expression = { +catalog.getOrElse(throw TableException("FunctionCatalog was not defined")) + .lookupFunction(name, operands) + } +} + +class WrapperVisitor extends RexVisitorImpl[RexNodeWrapper](true) { --- End diff -- We have to make sure that we do not miss anything here. IMO, we should try to translate as much as possible, but if something is not possible, we should make sure that we recognize that and do not offer this term to the `FilterableTableSource`. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878673#comment-15878673 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102487221 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExpressionExtractor.scala --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex._ +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.calcite.{FlinkTypeFactory, RexNodeWrapper} +import org.apache.flink.table.expressions._ +import org.apache.flink.table.validate.FunctionCatalog + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.immutable.IndexedSeq + +object RexProgramExpressionExtractor { + + /** +* converts a rexProgram condition into independent CNF expressions +* +* @param rexProgram The RexProgram to analyze +* @return converted expression +*/ + private[flink] def extractPredicateExpressions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): Array[Expression] = { + +val fieldNames = getInputsWithNames(rexProgram) + +val condition = rexProgram.getCondition +if (condition == null) { + return Array.empty +} +val call = rexProgram.expandLocalRef(condition) +val cnf = RexUtil.toCnf(rexBuilder, call) +val conjunctions = RelOptUtil.conjunctions(cnf) +val expressions = conjunctions.asScala.map( + RexNodeWrapper.wrap(_, catalog).toExpression(fieldNames) +) +expressions.toArray + } + + /** +* verify should we apply remained expressions on +* +* @param original initial expression +* @param remained remained part of original expression +* @return whether or not to decouple parts of the origin expression +*/ + private[flink] def verifyExpressions( + original: Array[Expression], + remained: Array[Expression]): Boolean = +remained forall (original contains) + + /** +* Generates a new RexProgram based on new expression. +* +* @param rexProgram original RexProgram +* @param scan input source +* @param predicate filter condition (fields must be resolved) +* @param relBuilder builder for converting expression to Rex +*/ + private[flink] def rewriteRexProgram( + rexProgram: RexProgram, + scan: TableScan, + predicate: Array[Expression])(implicit relBuilder: RelBuilder): RexProgram = { --- End diff -- We need to inject all conjunctive `RexNode` terms which could not be translated into `Expression` here as well. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878661#comment-15878661 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102470146 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RexNodeWrapper.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import org.apache.calcite.rex._ +import org.apache.calcite.sql._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.table.calcite.RexNodeWrapper._ + +abstract class RexNodeWrapper(rex: RexNode) { --- End diff -- Please add some documentation about the purpose of this class. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878666#comment-15878666 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102472465 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala --- @@ -53,13 +55,20 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + filterCondition ) } override def explainTerms(pw: RelWriter): RelWriter = { -super.explainTerms(pw) +val terms = super.explainTerms(pw) .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) +if (filterCondition != null) { + import scala.collection.JavaConverters._ --- End diff -- Please move import up > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878665#comment-15878665 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102485722 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( + operand(classOf[DataSetCalc], +operand(classOf[BatchTableSourceScan], none)), + "PushFilterIntoBatchTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program: RexProgram = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicate = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicate.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicate) + + if (verifyExpressions(predicate, remainingPredicate)) { + +val filterRexNode = getFilterExpressionAsRexNode( + program.getInputRowType, + scan, + predicate.diff(remainingPredicate))(call.builder()) + +val newScan = new BatchTableSourceScan( + scan.getCluster, + scan.getTraitSet, + scan.getTable, + scan.tableSource, + filterRexNode) + +val newCalcProgram = rewriteRexProgram( --- End diff -- We would need to add those conjunctive terms which could not be translated by `extractPredicateExpressions()`. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15876012#comment-15876012 ] ASF GitHub Bot commented on FLINK-3849: --- Github user tonycox commented on the issue: https://github.com/apache/flink/pull/3166 @fhueske could you look at this approach of transfering `RexNode` to `Expression`, it's stil in wip state, but I need your adjustment > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868730#comment-15868730 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r101310511 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.util + +import java.util + +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlKind, SqlOperator} +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.expressions._ +import org.apache.flink.table.sources.TableSource + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +object RexProgramExpressionExtractor { + + /** +* converts a rexProgram condition into expression +* +* @param rexProgram The RexProgram to analyze +* @return converted expression +*/ + def extractExpression(rexProgram: RexProgram): Expression = { + +val refInputToName = getInputsWithNames(rexProgram) +val visitor = new ExpressionVisitor(refInputToName) + +val condition = rexProgram.getCondition +if (condition == null) { + return null +} + +rexProgram.expandLocalRef(condition).accept(visitor) +val parsedExpression = ExpressionParser.parseExpression(visitor.getStringPredicate) + +parsedExpression + } + + /** +* verify can the original expression be divided into `new` expression +* and remainder part without loss of logical correctness +* +* @param original initial expression +* @param lump part of original expression +* @return whether or not to decouple parts of the origin expression +*/ + def verifyExpressions(original: Expression, lump: Expression): Boolean = { +if (original == null & lump == null) { + return false +} +if (original.children.isEmpty | !checkOperator(original)) { + return false +} +val head = original.children.head +val last = original.children.last +if (head.checkEquals(lump)) { + return checkOperator(original) +} +if (last.checkEquals(lump)) { + return checkOperator(original) +} +verifyExpressions(head, lump) match { + case true => true + case _ => verifyExpressions(last, lump) +} + } + + private def checkOperator(original: Expression): Boolean = { +original match { + case o: Or => false + case _ => true +} + } + + /** +* Generates a new RexProgram based on new expression. +* +* @param rexProgram original RexProgram +* @param scan input source +* @param expression filter condition (fields must be resolved) +* @param tableSource source to get names and type of table +* @param relBuilder builder for converting expression to Rex +*/ + def rewriteRexProgram( + rexProgram: RexProgram, + scan: TableScan, + expression: Expression, + tableSource: TableSource[_])(implicit relBuilder: RelBuilder): RexProgram = { + +if (expression != null) { + + val names = TableEnvironment.getFieldNames(tableSource) + + val nameToType = names +.zip(TableEnvironment.getFieldTypes(tableSource)).toMap + + relBuilder.push(scan) + + val rule: PartialFunction[Expression, Expression] = { +case u@UnresolvedFieldReference(name) =>
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868728#comment-15868728 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r101303533 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} +import org.apache.flink.table.plan.rules.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( + operand(classOf[DataSetCalc], +operand(classOf[BatchTableSourceScan], none)), + "PushFilterIntoBatchTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => true + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] +val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] + +val tableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val expression = extractExpression(calc.calcProgram) +val unusedExpr = tableSource.setPredicate(expression) + +if (verifyExpressions(expression, unusedExpr)) { --- End diff -- Why do we need this check? > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868733#comment-15868733 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r101306528 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.expressions.Expression + +/** + * Adds support for filtering push-down to a [[TableSource]]. + * A [[TableSource]] extending this interface is able to filter the fields of the return table. + * + */ +trait FilterableTableSource { + + /** return an predicate expression that was set. */ + def getPredicate: Option[Expression] --- End diff -- Do not use `Option` here. This interface might be implemented in Java. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868720#comment-15868720 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r101297805 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala --- @@ -59,8 +59,16 @@ class BatchTableSourceScan( } override def explainTerms(pw: RelWriter): RelWriter = { +val s = tableSource match { + case source: FilterableTableSource => + source.getPredicate.getOrElse("").toString.replaceAll("\\'|\\\"|\\s", "") + case _ => "" +} super.explainTerms(pw) .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) + // TODO should we have this? If yes how it should look like, as in DataCalc? --- End diff -- Yes, the filter should be in the explain string of the table source. I think it would be good if it was formatted as the filter in calc. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868727#comment-15868727 ] ASF GitHub Bot commented on FLINK-3849: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r101316851 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala --- @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.util + +import java.util + +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlKind, SqlOperator} +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.expressions._ +import org.apache.flink.table.sources.TableSource + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable + +object RexProgramExpressionExtractor { + + /** +* converts a rexProgram condition into expression +* +* @param rexProgram The RexProgram to analyze +* @return converted expression +*/ + def extractExpression(rexProgram: RexProgram): Expression = { + +val refInputToName = getInputsWithNames(rexProgram) +val visitor = new ExpressionVisitor(refInputToName) + +val condition = rexProgram.getCondition +if (condition == null) { + return null +} + +rexProgram.expandLocalRef(condition).accept(visitor) +val parsedExpression = ExpressionParser.parseExpression(visitor.getStringPredicate) --- End diff -- Converting by generating and parsing strings is not very reliable. We should rather map `RexNode` directly to `Expression`. @twalthr suggested to add the translation logic to the corresponding `Expression` next to the `toRexNode()` method. We would need to find a "dictionary" to identify the relevant `Expression` for a `RexNode` though. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)