[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16756125#comment-16756125 ] Kurt Young commented on FLINK-5859: --- I'm closing this issue since it will be addressed after blink gets merged. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169519#comment-16169519 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/4667 LGTM, @fhueske @twalthr can you also take a look? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169183#comment-16169183 ] ASF GitHub Bot commented on FLINK-5859: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139301067 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. --- End diff -- `partitionPruned` will be false, when the filter dose not contains partition conditions, otherwise it will be true. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169182#comment-16169182 ] ASF GitHub Bot commented on FLINK-5859: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139301022 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. --- End diff -- prunedPartitions => remainingPartitions > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169181#comment-16169181 ] ASF GitHub Bot commented on FLINK-5859: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139301007 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. +* Notes: If partition pruning is not applied, prunedPartitions is empty. +* @param predicates A list contains conjunctive predicates, you should pick and remove all +* expressions that can be pushed down. The remaining elements of this +* list will further evaluated by framework. +* @return A new cloned instance of [[TableSource]]. +
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169178#comment-16169178 ] ASF GitHub Bot commented on FLINK-5859: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139300836 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -55,4 +55,9 @@ trait FilterableTableSource[T] { */ def isFilterPushedDown: Boolean + /** +* @param relBuilder Builder for relational expressions. +*/ + def setRelBuilder(relBuilder: RelBuilder): Unit --- End diff -- `setRelBuilder` method is called in `PushFilterIntoTableSourceScanRule`. If we move `setRelBuilder` method to `PartitionableTableSource`, `PushFilterIntoTableSourceScanRule` should know `FilterableTableSource` and `PartitionableTableSource` both. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167224#comment-16167224 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139049572 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. --- End diff -- We should make this flag more clear. If you mean this flag represents whether the partition pruning is applied, i would say it should always be true, because when this method been called, at least framework had tried to apply the partition pruning. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature >
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167219#comment-16167219 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139049316 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. --- End diff -- Looks like the definition of "prunedPartitions" is contrary here. I think we should stick to only one definition, either "prunedPartitions" represents all partitions which have been pruned, or all remaining partitions which survive after pruning. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL:
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167216#comment-16167216 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139048604 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, PartitionPruner} + +import scala.collection.JavaConverters._ + +/** + * A [[TableSource]] extending this class is a partition table, + * and will get the relevant partitions about the query. + * + * @tparam T The return type of the [[TableSource]]. + */ +abstract class PartitionableTableSource[T] extends FilterableTableSource[T] { + + private var relBuilder: Option[RelBuilder] = None + + /** +* Get all partitions belong to this table +* +* @return All partitions belong to this table +*/ + def getAllPartitions: JList[Partition] + + /** +* Get partition field names. +* +* @return Partition field names. +*/ + def getPartitionFieldNames: Array[String] + + /** +* Get partition field types. +* +* @return Partition field types. +*/ + def getPartitionFieldTypes: Array[TypeInformation[_]] + + /** +* Whether drop partition predicates after apply partition pruning. +* +* @return true only if the result is correct without partition predicate +*/ + def supportDropPartitionPredicate: Boolean = false + + /** +* @return Pruned partitions +*/ + def getPrunedPartitions: JList[Partition] + + /** +* @return True if apply partition pruning +*/ + def isPartitionPruned: Boolean + + /** +* If a partitionable table source which can't apply non-partition filters should not pick any +* predicates. +* If a partitionable table source which can apply non-partition filters should check and pick +* only predicates this table source can support. +* +* After trying to push pruned-partitions and predicates down, we should return a new +* [[TableSource]] instance which holds all pruned-partitions and all pushed down predicates. +* Even if we actually pushed nothing down, it is recommended that we still return a new +* [[TableSource]] instance since we will mark the returned instance as filter push down has +* been tried. +* +* We also should note to not changing the form of the predicates passed in. It has been +* organized in CNF conjunctive form, and we should only take or leave each element from the +* list. Don't try to reorganize the predicates if you are absolutely confident with that. +* +* @param partitionPruned Whether partition pruning is applied. +* @param prunedPartitions Remaining partitions after partition pruning applied. +* Notes: If partition pruning is not applied, prunedPartitions is empty. +* @param predicates A list contains conjunctive predicates, you should pick and remove all +* expressions that can be pushed down. The remaining elements of this +* list will further evaluated by framework. +* @return A new cloned instance of [[TableSource]]. +
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167213#comment-16167213 ] ASF GitHub Bot commented on FLINK-5859: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/4667#discussion_r139048337 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala --- @@ -55,4 +55,9 @@ trait FilterableTableSource[T] { */ def isFilterPushedDown: Boolean + /** +* @param relBuilder Builder for relational expressions. +*/ + def setRelBuilder(relBuilder: RelBuilder): Unit --- End diff -- Can you move this method to PartitionableTableSource? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165823#comment-16165823 ] ASF GitHub Bot commented on FLINK-5859: --- GitHub user godfreyhe opened a pull request: https://github.com/apache/flink/pull/4667 [FLINK-5859] [table] Add PartitionableTableSource for partition pruning ## What is the purpose of the change This pull request adds PartitionableTableSource for partition pruning when optimizing the query plan. That way both query optimization time and execution time can be reduced obviously, especially for a large partitioned table. ## Brief change log - *Adds PartitionableTableSource which extends FilterableTableSource* - *Adds setRelBuilder method in FilterableTableSource class* - *Adds implementation for partition pruning and extracting partition predicates* ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests for PartitionableTableSource on batch and stream sql* - *Added test that validates the correct of partition pruning* - *Added test that validates the correct of extracting partition predicates* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/godfreyhe/flink FLINK-5859 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4667.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4667 commit 2fc5f9d6e6a1625a0d7784d320f8dde5df8a7f5e Author: godfreyheDate: 2017-09-14T06:00:54Z [FLINK-5859] [table] Add PartitionableTableSource for partition pruning > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961472#comment-15961472 ] Fabian Hueske commented on FLINK-5859: -- Hi [~godfreyhe], FilterableTableSource has been merged to master. You could continue with the discussed approach. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904724#comment-15904724 ] Fabian Hueske commented on FLINK-5859: -- Yes, the first changes should be attributed to [~tonycox]. Just put the your changes on top. Opening a new PR sounds good to me. Thanks, Fabian > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904271#comment-15904271 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], i can continue working on {{FilterableTableSource}}. Should i open another PR based on the former changes? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902816#comment-15902816 ] Fabian Hueske commented on FLINK-5859: -- Hi [~godfreyhe], the contributor of the {{FilterableTableSource}} does not have time at the moment to continue working on the PR (https://github.com/apache/flink/pull/3166). Do you want to pick that issue up and complete the work? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885360#comment-15885360 ] Fabian Hueske commented on FLINK-5859: -- Hi [~godfreyhe] and [~ykt836], sounds good to me. +1 to revisit this approach if we figure out that it restricts us in some way or leads to sub-optimal plans. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885179#comment-15885179 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], i think your API suggestion is good way to start with, let's do it this way and see if something should be changed in the future. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885098#comment-15885098 ] godfrey he commented on FLINK-5859: --- Hi, [~fhueske], Thanks for you advice. IMO, Rules including `PushProjectIntoBatchTableSourceScanRule`, `PushFilterIntoBatchTableSourceScanRule`, `PartitionPruningRule`(maybe, we integrate it in PushFilterIntoBatchTableSourceScanRule) and so on are need be applied only once and do not need cost model actually. And Rules including `FilterCalcMergeRule`, `FilterJoinRule`, `DataSetCalcRule` and so on do not need real cost, dummy cost is enough. Rules including `LoptOptimizeJoinRule`, `JoinToMultiJoinRule` and so on are applied with real cost. So we want to break the optimization phase down into 3 phases later. The whole optimization include 5 steps: 1. decorrelates a query 2. normalize the logical plan with HEP planner 3. optimize the logical plan with Volcano planner and dummy cost(including `FilterCalcMergeRule`, `FilterJoinRule`, `DataSetCalcRule` and so on) 4. optimize the physical plan with HEP planner (including `PushProjectIntoBatchTableSourceScanRule`, `PushFilterIntoBatchTableSourceScanRule` and so on) 5. optimize the physical plan with Volcano planner and real cost (including `LoptOptimizeJoinRule`, `JoinToMultiJoinRule` and so on) At that time, each optimization phase keeps the complexity as small as possible. And your concern can be eliminated also. Looking forward to your advice, thanks. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882446#comment-15882446 ] Fabian Hueske commented on FLINK-5859: -- Hi [~godfreyhe] and [~ykt836], I think we can also get a nice API if we handle both cases as regular filter push down. If we implement {{PartitionableTableSource}} as follows: {code} abstract class PartitionableTableSource extends FilterableTableSource { // This needs to be implemented! def getAllPartitions: String[] // This needs to be implemented // Interface can also be easier and not use Expression def applyPartitionPruning(partitionsToPrune: Array[Expression]): Unit // Default implementation. Must be overridden to apply filter in addition to partition pruning. // If overridden, it will be called when partitions have already be pruned. // -> If it needs to scan meta data, it knows which partitions to skip. def applyPredicate(predicate: Array[Expression]): Array[Expression] = { // by default returns all predicates predicate } // Default implementation. Will be called by PushDownFilterRule override def setPredicate(predicates: Array[Expression]): Array[Expression] = { // identify which partitions exist val partitions = getAllPartitions // go over predicate expressions and identify how partition pruning can be applied val (partitionsToPrune, remaining): (Array[Expression], Array[Expression]) = predicates.foreach(???) // set partitions to prune applyPartitionPruning(partitionsToPrune) // apply remaining predicates val remainingAfterFilter = applyPredicate(remaining) remainingAfterFilter } } {code} This approach is fully integrated with the {{FilterableTableSource}} and does not require any additional logic in the optimizer (no rules, etc.). If only partition pruning should be done, only {{getAllPartitions}} and {{applyPartitionPruning}} need to be implemented. If the table source should also apply filters it needs to override {{applyPredicate()}}. It also reduces the scan of metadata because partitions are pruned before the metadata for filters need to be checked. Is there another benefit of applying PartitionPruning earlier? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881999#comment-15881999 ] godfrey he commented on FLINK-5859: --- Hi, [~fhueske], Thanks for your advice. yes, partition pruning is a kind of coarse-grained filter push-down, both filter-pushdown and partition-pruning have common parts that are extracting predicate from filter-condition base on the interest of different datasources. But, IMO, filter-pushdown and partition-pruning are independent concept in general. The following table shows that different datasources have different traits: ||Trait||Example|| |filter-pushdown only|MySQL, HBase| |partiton-pruning only|CSV, TEXT| |both filter-pushdown and partition-pruning| Parquet, Druid| IMO, we should provide a clear concept as [~ykt836] mentioned above for developers, that includes both FilterableTableSource and PartitionableTableSource. Looking forward to your advice, thanks. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881737#comment-15881737 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], How about this approach: We both provide {{FilterableTableSource}} and {{PartitionableTableSource}}, keep {{FilterableTableSource}} as it is, and add methods like {{getAllPartitions}} and {{applyPartitionPruning}} to {{PartitionableTableSource}}. From a developer's point of view, we can treat these two traits completely independent. It will be easier for a developer to implement each functionality independently in comparing with mixing all the logic into the {{FilterableTableSource. setPredicate()}}. Also in the future, i think it will be very likely that these two traits will be applied by framework in different optimization stage. We apply the partition pruning as early as possible in the logical optimization and let filter pushdown been applied a little bit later because it should do some heavy weighted physical level analysis first. BTW, this approach still can achieve the approach you suggested, you can implement {{FilterableTableSource}} only and do all the pruning and filtering if you like. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880416#comment-15880416 ] Fabian Hueske commented on FLINK-5859: -- For such cases, we could either 1. implement {{FilterableTableSource}} and manually figure out filters and partitions or 2. {{PartitionableTableSource}} could have another method {{setFilterPredicate()}} which has the same semantics as {{FilterableTableSource.setPredicate()}} but which is called from {{PartitionableTableSource.setPredicate()}} with the remaining predicates which could not be used to prune partitions. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880359#comment-15880359 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], With the approach you proposed, how can we handle the cases that some table sources support partition pruning and filter pushdown both? I think it's better to let them be composite relation rather than inheritance. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880231#comment-15880231 ] Fabian Hueske commented on FLINK-5859: -- Hi [~ykt836], My main motivation to treat partition pruning as filter push-down is to keep the complexity of the optimizer as small as possible. You are right, the effort to determine whether a filter can be applied or not depends on the format of the source. However, I don't think that this necessarily means that partition pruning must be handled as a special case. In the end it depends on the TableSource how it determines which predicates apply and which don't. A partitionable table source would not need to scan all metadata. I see your point about the effort and complexity to implement a partitionable TableSource. What do you think of the following approach? We implement a {{PartitionableTableSource}} as an abstract class that implements the {{FilterableTableSource}} interface. {{PartitionableTableSource}} would have abstract methods to list the partitioned fields (and maybe some more). Based on that information {{PartitionableTableSource}} implements {{FilterableTableSource.setPredicate()}} and {{FilterableTableSource.getPredicate()}}, i.e., the {{PartitionableTableSource}} automatically extracts the right filter expressions and returns everything it cannot deal with based on the provided partitioned fields. TableSources which just support filter push-down by partition pruning implement {{PartitionableTableSource}} and only have to specify the partition columns and not have to deal with {{setPredicate()}}. This solution would keep all partition pruning related logic out of the optimizer and table schemas. What you think? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879991#comment-15879991 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], You raised a very good question, which is essentially "what's the difference between the filter pushdown and partition pruning". You are right about partition pruning is actually a coarse-grained filter push-down, but more importantly, we can view it as a more "static" or more "predictable" filter. Here is an example to describe more explicitly. Suppose Flink supports parquet table source, and since parquet files contains some RowGroup level statistics such as max/min value, we can use these information to reduce the data we need to read. But before we do anything, we need to make sure whether the source files contain such information or not. So we need to read all the metas from these files to do some check work. If we are facing thousands of the files, it will be really costly. However, the partition is something more static and predictable. Like if all your source files are organized by some time based directory like /mm/dd/1.file and we actually have some partition fields to describe the time information. It will be more efficient and easy to do the partition level filter first. But this doesn't mean we should have another trait like {{PartitionableTableSource}}, either extending the under reviewing {{FilterableTableSource}} or provide another explicitly {{PartitionableTableSource}} is fine with me. But we should at least make "partition pruning" seeable from the users who may write their own {{TableSource}}, instead of let all the magics happen under one method right now, which will be {code}def setPredicate(predicate: Array[Expression]): Array[Expression]{code} in the current under reviewing version of {{FilterableTableSource}}. Let me know if you have some thoughts about this. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875689#comment-15875689 ] Fabian Hueske commented on FLINK-5859: -- Partition pruning is a kind of coarse-grained filter push-down. I think this issue should be solved by FLINK-3849 which tries to push filters into {{TableSources}}. It the responsibility of {{TableSources}} that implement {{FilterableTableSource}} to skip irrelevant input. What do you think [~godfreyhe]? > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)