[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2019-01-30 Thread Kurt Young (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16756125#comment-16756125
 ] 

Kurt Young commented on FLINK-5859:
---

I'm closing this issue since it will be addressed after blink gets merged.

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169519#comment-16169519
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/4667
  
LGTM, @fhueske @twalthr can you also take a look?


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169183#comment-16169183
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139301067
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
--- End diff --

`partitionPruned` will be false, when the filter dose not contains 
partition conditions, otherwise it will be true.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are 

[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169182#comment-16169182
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139301022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
--- End diff --

prunedPartitions => remainingPartitions


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data 

[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169181#comment-16169181
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139301007
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
+* Notes: If partition pruning is not applied, 
prunedPartitions is empty.
+* @param predicates   A list contains conjunctive predicates, you 
should pick and remove all
+* expressions that can be pushed down. The 
remaining elements of this
+* list will further evaluated by framework.
+* @return A new cloned instance of [[TableSource]].
+  

[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169178#comment-16169178
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139300836
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -55,4 +55,9 @@ trait FilterableTableSource[T] {
 */
   def isFilterPushedDown: Boolean
 
+  /**
+* @param relBuilder Builder for relational expressions.
+*/
+  def setRelBuilder(relBuilder: RelBuilder): Unit
--- End diff --

`setRelBuilder` method is called in `PushFilterIntoTableSourceScanRule`. If 
we move  `setRelBuilder` method to `PartitionableTableSource`,  
`PushFilterIntoTableSourceScanRule` should know `FilterableTableSource` and 
`PartitionableTableSource` both.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167224#comment-16167224
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139049572
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
--- End diff --

We should make this flag more clear. If you mean this flag represents 
whether the partition pruning is applied, i would say it should always be true, 
because when this method been called, at least framework had tried to apply the 
partition pruning.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>   

[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167219#comment-16167219
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139049316
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
--- End diff --

Looks like the definition of "prunedPartitions" is contrary here. I think 
we should stick to only one definition, either "prunedPartitions" represents 
all partitions which have been pruned, or all remaining partitions which 
survive after pruning.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: 

[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167216#comment-16167216
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139048604
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor, 
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A [[TableSource]] extending this class is a partition table,
+  * and will get the relevant partitions about the query.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+abstract class PartitionableTableSource[T] extends 
FilterableTableSource[T] {
+
+  private var relBuilder: Option[RelBuilder] = None
+
+  /**
+* Get all partitions belong to this table
+*
+* @return All partitions belong to this table
+*/
+  def getAllPartitions: JList[Partition]
+
+  /**
+* Get partition field names.
+*
+* @return Partition field names.
+*/
+  def getPartitionFieldNames: Array[String]
+
+  /**
+* Get partition field types.
+*
+* @return Partition field types.
+*/
+  def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+  /**
+* Whether drop partition predicates after apply partition pruning.
+*
+* @return true only if the result is correct without partition 
predicate
+*/
+  def supportDropPartitionPredicate: Boolean = false
+
+  /**
+* @return Pruned partitions
+*/
+  def getPrunedPartitions: JList[Partition]
+
+  /**
+* @return True if apply partition pruning
+*/
+  def isPartitionPruned: Boolean
+
+  /**
+* If a partitionable table source which can't apply non-partition 
filters should not pick any
+* predicates.
+* If a partitionable table source which can apply non-partition 
filters should check and pick
+* only predicates this table source can support.
+*
+* After trying to push pruned-partitions and predicates down, we 
should return a new
+* [[TableSource]] instance which holds all pruned-partitions and all 
pushed down predicates.
+* Even if we actually pushed nothing down, it is recommended that we 
still return a new
+* [[TableSource]] instance since we will mark the returned instance as 
filter push down has
+* been tried.
+* 
+* We also should note to not changing the form of the predicates 
passed in. It has been
+* organized in CNF conjunctive form, and we should only take or leave 
each element from the
+* list. Don't try to reorganize the predicates if you are absolutely 
confident with that.
+*
+* @param partitionPruned  Whether partition pruning is applied.
+* @param prunedPartitions Remaining partitions after partition pruning 
applied.
+* Notes: If partition pruning is not applied, 
prunedPartitions is empty.
+* @param predicates   A list contains conjunctive predicates, you 
should pick and remove all
+* expressions that can be pushed down. The 
remaining elements of this
+* list will further evaluated by framework.
+* @return A new cloned instance of [[TableSource]].
+  

[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167213#comment-16167213
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/4667#discussion_r139048337
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FilterableTableSource.scala
 ---
@@ -55,4 +55,9 @@ trait FilterableTableSource[T] {
 */
   def isFilterPushedDown: Boolean
 
+  /**
+* @param relBuilder Builder for relational expressions.
+*/
+  def setRelBuilder(relBuilder: RelBuilder): Unit
--- End diff --

Can you move this method to PartitionableTableSource?


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165823#comment-16165823
 ] 

ASF GitHub Bot commented on FLINK-5859:
---

GitHub user godfreyhe opened a pull request:

https://github.com/apache/flink/pull/4667

[FLINK-5859] [table] Add PartitionableTableSource for partition pruning

## What is the purpose of the change

This pull request adds PartitionableTableSource for partition pruning when 
optimizing the query plan. That way both query optimization time and execution 
time can be reduced obviously, especially for a large partitioned table.

## Brief change log

  - *Adds PartitionableTableSource which extends FilterableTableSource*
  - *Adds setRelBuilder method in FilterableTableSource class*
  - *Adds implementation for partition pruning and extracting partition 
predicates*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added integration tests for PartitionableTableSource on batch and 
stream sql*
  - *Added test that validates the correct of partition pruning*
  - *Added test that validates the correct of extracting partition 
predicates*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/godfreyhe/flink FLINK-5859

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4667.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4667


commit 2fc5f9d6e6a1625a0d7784d320f8dde5df8a7f5e
Author: godfreyhe 
Date:   2017-09-14T06:00:54Z

[FLINK-5859] [table] Add PartitionableTableSource for partition pruning




> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-04-07 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961472#comment-15961472
 ] 

Fabian Hueske commented on FLINK-5859:
--

Hi [~godfreyhe], FilterableTableSource has been merged to master. 
You could continue with the discussed approach.

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-03-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904724#comment-15904724
 ] 

Fabian Hueske commented on FLINK-5859:
--

Yes, the first changes should be attributed to [~tonycox]. 
Just put the your changes on top. 
Opening a new PR sounds good to me. Thanks, Fabian

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-03-09 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15904271#comment-15904271
 ] 

Kurt Young commented on FLINK-5859:
---

Hi [~fhueske], i can continue working on {{FilterableTableSource}}. Should i 
open another PR based on the former changes?

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-03-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902816#comment-15902816
 ] 

Fabian Hueske commented on FLINK-5859:
--

Hi [~godfreyhe], the contributor of the {{FilterableTableSource}} does not have 
time at the moment to continue working on the PR 
(https://github.com/apache/flink/pull/3166).
Do you want to pick that issue up and complete the work?

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-27 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885360#comment-15885360
 ] 

Fabian Hueske commented on FLINK-5859:
--

Hi [~godfreyhe] and [~ykt836], sounds good to me.
+1 to revisit this approach if we figure out that it restricts us in some way 
or leads to sub-optimal plans.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-26 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885179#comment-15885179
 ] 

Kurt Young commented on FLINK-5859:
---

Hi [~fhueske], i think your API suggestion is good way to start with, let's do 
it this way and see if something should be changed in the future.

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-26 Thread godfrey he (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885098#comment-15885098
 ] 

godfrey he commented on FLINK-5859:
---

Hi, [~fhueske], Thanks for you advice. 

IMO, Rules including `PushProjectIntoBatchTableSourceScanRule`, 
`PushFilterIntoBatchTableSourceScanRule`, `PartitionPruningRule`(maybe, we 
integrate it in PushFilterIntoBatchTableSourceScanRule) and so on are need be 
applied only once and do not need cost model actually. And Rules including 
`FilterCalcMergeRule`, `FilterJoinRule`, `DataSetCalcRule` and so on 
do not need real cost, dummy cost is enough. Rules including 
`LoptOptimizeJoinRule`, `JoinToMultiJoinRule` and so on are applied with  real 
cost. So we want to break the optimization phase down into 3 phases later. The 
whole optimization include 5 steps: 
1. decorrelates a query
2. normalize the logical plan with HEP planner
3. optimize the logical plan with Volcano planner and dummy cost(including 
`FilterCalcMergeRule`, `FilterJoinRule`, `DataSetCalcRule` and so on)
4. optimize the physical plan with HEP planner (including 
`PushProjectIntoBatchTableSourceScanRule`, 
`PushFilterIntoBatchTableSourceScanRule` and so on)
5. optimize the physical plan with Volcano planner and real cost (including 
`LoptOptimizeJoinRule`, `JoinToMultiJoinRule` and so on)

At that time, each optimization phase  keeps the complexity as small as 
possible. And your concern can be eliminated also. 

Looking forward to your advice, thanks.

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882446#comment-15882446
 ] 

Fabian Hueske commented on FLINK-5859:
--

Hi [~godfreyhe] and [~ykt836],

I think we can also get a nice API if we handle both cases as regular filter 
push down.
If we implement {{PartitionableTableSource}} as follows:

{code}
abstract class PartitionableTableSource extends FilterableTableSource {

  // This needs to be implemented!
  def getAllPartitions: String[]

  // This needs to be implemented
  // Interface can also be easier and not use Expression
  def applyPartitionPruning(partitionsToPrune: Array[Expression]): Unit

  // Default implementation. Must be overridden to apply filter in addition to 
partition pruning.
  //   If overridden, it will be called when partitions have already be pruned.
  //   -> If it needs to scan meta data, it knows which partitions to skip.
  def applyPredicate(predicate: Array[Expression]): Array[Expression] = {
// by default returns all predicates
predicate
  }

  // Default implementation. Will be called by PushDownFilterRule
  override def setPredicate(predicates: Array[Expression]): Array[Expression] = 
{

// identify which partitions exist
val partitions = getAllPartitions
// go over predicate expressions and identify how partition pruning can be 
applied
val (partitionsToPrune, remaining): (Array[Expression], Array[Expression]) 
= predicates.foreach(???)
// set partitions to prune
applyPartitionPruning(partitionsToPrune)

// apply remaining predicates
val remainingAfterFilter = applyPredicate(remaining)

remainingAfterFilter
  }

}
{code}

This approach is fully integrated with the {{FilterableTableSource}} and does 
not require any additional logic in the optimizer (no rules, etc.).
If only partition pruning should be done, only {{getAllPartitions}} and 
{{applyPartitionPruning}} need to be implemented. If the table source should 
also apply filters it needs to override {{applyPredicate()}}.
It also reduces the scan of metadata because partitions are pruned before the 
metadata for filters need to be checked.

Is there another benefit of applying PartitionPruning earlier?

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread godfrey he (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881999#comment-15881999
 ] 

godfrey he commented on FLINK-5859:
---

Hi, [~fhueske], Thanks for your advice.
yes, partition pruning is a kind of coarse-grained filter push-down, both 
filter-pushdown and partition-pruning have common parts that are extracting 
predicate from filter-condition base on the interest of different datasources. 
But, IMO, filter-pushdown and partition-pruning are independent concept in 
general. 
The following table shows that different datasources have different traits:

||Trait||Example||
|filter-pushdown only|MySQL, HBase|
|partiton-pruning only|CSV, TEXT|
|both filter-pushdown and partition-pruning| Parquet, Druid|

IMO, we should provide a clear concept as [~ykt836] mentioned above for 
developers, that includes both FilterableTableSource and 
PartitionableTableSource.

Looking forward to your advice, thanks.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881737#comment-15881737
 ] 

Kurt Young commented on FLINK-5859:
---

Hi [~fhueske],

How about this approach:
We both provide {{FilterableTableSource}} and {{PartitionableTableSource}}, 
keep {{FilterableTableSource}} as it is, and add methods like 
{{getAllPartitions}} and {{applyPartitionPruning}} to 
{{PartitionableTableSource}}. From a developer's point of view, we can treat 
these two traits completely independent. It will be easier for a developer to 
implement each functionality independently in comparing with mixing all the 
logic into the {{FilterableTableSource. setPredicate()}}. Also in the future, i 
think it will be very likely that these two traits will be applied by framework 
in different optimization stage. We apply the partition pruning as early as 
possible in the logical optimization and let filter pushdown been applied a 
little bit later because it should do some heavy weighted physical level 
analysis first. 
BTW, this approach still can achieve the approach you suggested, you can 
implement {{FilterableTableSource}} only and do all the pruning and filtering 
if you like. 

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880416#comment-15880416
 ] 

Fabian Hueske commented on FLINK-5859:
--

For such cases, we could either 

1. implement {{FilterableTableSource}} and manually figure out filters and 
partitions or 
2. {{PartitionableTableSource}} could have another method 
{{setFilterPredicate()}} which has the same semantics as 
{{FilterableTableSource.setPredicate()}} but which is called from 
{{PartitionableTableSource.setPredicate()}} with the remaining predicates which 
could not be used to prune partitions.


> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880359#comment-15880359
 ] 

Kurt Young commented on FLINK-5859:
---

Hi [~fhueske],

With the approach you proposed, how can we handle the cases that some table 
sources support partition pruning and filter pushdown both? I think it's better 
to let them be composite relation rather than inheritance. 

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880231#comment-15880231
 ] 

Fabian Hueske commented on FLINK-5859:
--

Hi [~ykt836],

My main motivation to treat partition pruning as filter push-down is to keep 
the complexity of the optimizer as small as possible.

You are right, the effort to determine whether a filter can be applied or not 
depends on the format of the source. However, I don't think that this 
necessarily means that partition pruning must be handled as a special case. In 
the end it depends on the TableSource how it determines which predicates apply 
and which don't. A partitionable table source would not need to scan all 
metadata. 

I see your point about the effort and complexity to implement a partitionable 
TableSource. 
What do you think of the following approach?
We implement a {{PartitionableTableSource}} as an abstract class that 
implements the {{FilterableTableSource}} interface. 
{{PartitionableTableSource}} would have abstract methods to list the 
partitioned fields (and maybe some more). Based on that information 
{{PartitionableTableSource}} implements 
{{FilterableTableSource.setPredicate()}} and 
{{FilterableTableSource.getPredicate()}}, i.e., the 
{{PartitionableTableSource}} automatically extracts the right filter 
expressions and returns everything it cannot deal with based on the provided 
partitioned fields.
TableSources which just support filter push-down by partition pruning implement 
{{PartitionableTableSource}} and only have to specify the partition columns and 
not have to deal with {{setPredicate()}}.

This solution would keep all partition pruning related logic out of the 
optimizer and table schemas. 

What you think?

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-22 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879991#comment-15879991
 ] 

Kurt Young commented on FLINK-5859:
---

Hi [~fhueske], 

You raised a very good question, which is essentially "what's the difference 
between the filter pushdown and partition pruning".

You are right about partition pruning is actually a coarse-grained filter 
push-down, but more importantly, we can view it as a more "static" or more 
"predictable" filter. Here is an example to describe more explicitly. Suppose 
Flink supports parquet table source,  and since parquet files contains some 
RowGroup level statistics such as max/min value, we can use these information 
to reduce the data we need to read. But before we do anything, we need to make 
sure whether the source files contain such information or not. So we need to 
read all the metas from these files to do some check work. If we are facing 
thousands of the files, it will be really costly. 

However, the partition is something more static and predictable. Like if all 
your source files are organized by some time based directory like 
/mm/dd/1.file and we actually have some partition fields to describe the 
time information. It will be more efficient and easy to do the partition level 
filter first. 

But this doesn't mean we should have another trait like 
{{PartitionableTableSource}}, either extending the under reviewing 
{{FilterableTableSource}} or provide another explicitly 
{{PartitionableTableSource}} is fine with me. But we should at least make 
"partition pruning" seeable from the users who may write their own 
{{TableSource}}, instead of let all the magics happen under one method right 
now, which will be 
{code}def setPredicate(predicate: Array[Expression]): Array[Expression]{code} 
in the current under reviewing version of {{FilterableTableSource}}. 

Let me know if you have some thoughts about this. 

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL

2017-02-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875689#comment-15875689
 ] 

Fabian Hueske commented on FLINK-5859:
--

Partition pruning is a kind of coarse-grained filter push-down.

I think this issue should be solved by FLINK-3849 which tries to push filters 
into {{TableSources}}. 
It the responsibility of {{TableSources}} that implement 
{{FilterableTableSource}} to skip irrelevant input.

What do you think [~godfreyhe]?

> support partition pruning on Table API & SQL
> 
>
> Key: FLINK-5859
> URL: https://issues.apache.org/jira/browse/FLINK-5859
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)