[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-28 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339411501
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
+
+val newTableSource = 
newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+val oldTableSource = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
+&& 
newTableSource.explainSource().equals(oldTableSource.explainSource)) {
+  throw new TableException("Failed to push limit into table source! "
 
 Review comment:
   I think your thought is already in https://github.com/apache/flink/pull/8389 
, and has already decided not go this way.


This is an automated message from 

[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339407704
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
+
+val newTableSource = 
newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+val oldTableSource = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
+&& 
newTableSource.explainSource().equals(oldTableSource.explainSource)) {
+  throw new TableException("Failed to push limit into table source! "
 
 Review comment:
   Related PR: https://github.com/apache/flink/pull/8468
   It can be done something in `TableSource.explainSource`, it is default 
method and can have some logical.
   But I think this check could exist.


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339408562
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
 
 Review comment:
   It can be, I think it is the third reason to keep the origin limit relNode.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339407219
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sources._
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * The table source which support push-down the limit to the source.
+  */
+class TestLimitableTableSource(
+data: Seq[Row],
+rowType: RowTypeInfo,
+var limit: Long = -1,
+var limitablePushedDown: Boolean = false)
+  extends StreamTableSource[Row]
+  with LimitableTableSource[Row] {
+
+  override def isBounded = true
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
+val dataSet = if (limit > 0) {
+  data.take(limit.toInt).asJava
+} else {
+  data.asJava
+}
+execEnv.createInput(
+  new CollectionInputFormat(dataSet, rowType.createSerializer(new 
ExecutionConfig)),
+  rowType)
+  }
+
+  override def applyLimit(limit: Long): TableSource[Row] = {
+new TestLimitableTableSource(data, rowType, limit, limitablePushedDown)
+  }
+
+  override def isLimitPushedDown: Boolean = limitablePushedDown
+
+  override def getReturnType: TypeInformation[Row] = rowType
+
+  override def explainSource(): String = {
+if (limit > 0 && limit < Long.MaxValue) {
+  "limit: " + limit
+} else if (limitablePushedDown) {
+  "limitablePushedDown"
 
 Review comment:
   Yes, I will add:
   ```
   if (limit == 0) {
 throw new RuntimeException("limit 0 should be optimize to single 
values.")
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339406809
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   maybe, if the total records are less than the limit, it could be.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339406297
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+import java.util.Collections
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  *
+  * The reasons why the limit still be retained:
+  * 1.If the source is required to return the exact number of limit number, 
the implementation
+  * of the source is highly required. The source is required to accurately 
control the record
+  * number of split, and the parallelism setting also need to be adjusted 
accordingly.
+  * 2.When remove the limit, maybe filter will be pushed down to the source 
after limit pushed
+  * down. The source need know it should do limit first and do the filter 
later, it is hard to
+  * implement.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
+
+val newTableSource = 
newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+val oldTableSource = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
+&& 
newTableSource.explainSource().equals(oldTableSource.explainSource)) {
+  throw new TableException("Failed to push limit into table source! "
+  + "table source with pushdown capability must override and change "
+  + "explainSource() API to explain the pushdown applied!")
+}
+
+call.transformTo(sort.copy(sort.getTraitSet, 

[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339405733
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   I think this word just tell user: there is no limit, you just do what you 
can do.
   The comments in `PushLimitIntoTableSourceScanRule` is difficult to 
understand without code. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-27 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r339405733
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ *
+ * After pushing down, source only needs to try its best to limit the 
number of output records,
+ * but does not need to guarantee that the number must be less than or equal 
to the limit.
 
 Review comment:
   I think this word just tell user: there is no limit, you just do what you 
can do.
   The comments in `PushLimitIntoTableSourceScanRule` is difficult to 
understand without code, I think we can just add a link to 
`PushLimitIntoTableSourceScanRule`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-25 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338967232
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
 ##
 @@ -144,6 +186,27 @@ LogicalSort(fetch=[0])
 
   
+
+  
+  
+
+  
+
+
+  
+
+
+  

[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-25 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338952532
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
 ##
 @@ -144,6 +186,27 @@ LogicalSort(fetch=[0])
 
   
+
+  
+  
+
+  
+
+
+  
+
+
+  

[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-25 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338928023
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sources._
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * The table source which support push-down the limit to the source.
+  */
+class TestLimitableTableSource(
+data: Seq[Row],
+rowType: RowTypeInfo,
+var limit: Long = -1,
+var limitablePushedDown: Boolean = false)
+  extends StreamTableSource[Row]
+  with LimitableTableSource[Row] {
+
+  override def isBounded = true
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
+if (limit == 0 && limit >= 0) {
 
 Review comment:
   I will remove this judge, limit default value is -1, that means no limit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-25 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338922755
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
 
 Review comment:
   With retain the limit, it is not a problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-24 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338878083
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
+
+val newTableSource = 
newRelOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+val oldTableSource = 
relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource
+
+if (newTableSource.asInstanceOf[LimitableTableSource[_]].isLimitPushedDown
+&& 
newTableSource.explainSource().equals(oldTableSource.explainSource)) {
+  throw new TableException("Failed to push limit into table source! "
+  + "table source with pushdown capability must override and change "
+  + "explainSource() API to explain the pushdown applied!")
+}
+call.transformTo(newScan)
 
 Review comment:
   maybe we should really retain the limit. what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-24 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338868435
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
 
 Review comment:
   If a source is pushed down the filters, but it keeps the filters in the 
plan. Then it won't go through `PushLimitIntoTableSourceScanRule`. Limit push 
down must have no filter in front of it. It can't be pushed down when there is 
some filters, so I think the logic of `PushLimitIntoTableSourceScanRule` is 
independent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-24 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338408218
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/LimitTest.scala
 ##
 @@ -90,4 +96,28 @@ class LimitTest extends TableTestBase {
 util.verifyPlan("SELECT a, c FROM MyTable OFFSET 10 ROWS")
   }
 
+  @Test
+  def testFetchWithLimitSource(): Unit = {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-24 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338399487
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
 ##
 @@ -19,30 +19,56 @@ limitations under the License.
   
 
   
+
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-24 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338398862
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TestLimitableTableSource.scala
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sources._
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * The table source which support push-down the limit to the source.
+  */
+class TestLimitableTableSource(
+data: Seq[Row],
+rowType: RowTypeInfo,
+var limit: Long = -1,
+var limitablePushedDown: Boolean = false)
+  extends StreamTableSource[Row]
+  with LimitableTableSource[Row] {
+
+  override def isBounded = true
+
+  override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
+if (limit == 0 && limit >= 0) {
 
 Review comment:
   just `== 0`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-24 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338398303
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
+case _ => false
+  }
+case _ => false
+  }
+}
+supportPushDown
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val scan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
+val relOptTable = scan.getTable.asInstanceOf[FlinkRelOptTable]
+val limit = RexLiteral.intValue(sort.fetch)
+val relBuilder = call.builder()
+val newRelOptTable = applyLimit(limit, relOptTable, relBuilder)
+val newScan = scan.copy(scan.getTraitSet, newRelOptTable)
 
 Review comment:
   Yeah, I need rebase master code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-24 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338396708
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoTableSourceScanRule.scala
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.flink.table.plan.stats.TableStats
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalSort, 
FlinkLogicalTableSourceScan}
+import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.LimitableTableSource
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Sort, TableScan}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.tools.RelBuilder
+
+/**
+  * Planner rule that tries to push limit into a [[LimitableTableSource]].
+  * The original limit will still be retained.
+  */
+class PushLimitIntoTableSourceScanRule extends RelOptRule(
+  operand(classOf[FlinkLogicalSort],
+operand(classOf[FlinkLogicalTableSourceScan], none)), 
"PushLimitIntoTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val sort = call.rel(0).asInstanceOf[Sort]
+val fetch = sort.fetch
+val offset = sort.offset
+// Only push-down the limit whose offset equal zero. Because it is 
difficult to source based
+// push to handle the non-zero offset. And the non-zero offset usually 
appear together with
+// sort.
+val onlyLimit = sort.getCollation.getFieldCollations.isEmpty &&
+(offset == null || RexLiteral.intValue(offset) == 0) &&
+fetch != null
+
+var supportPushDown = false
+if (onlyLimit) {
+  supportPushDown = call.rel(1).asInstanceOf[TableScan]
+  .getTable.unwrap(classOf[TableSourceTable[_]]) match {
+case table: TableSourceTable[_] =>
+  table.tableSource match {
+case source: LimitableTableSource[_] => !source.isLimitPushedDown
 
 Review comment:
   Do you have a better idea? I think it's a simple judgment. It doesn't need 
to be cleaner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] Introduce LimitableTableSource for optimizing limit

2019-10-24 Thread GitBox
JingsongLi commented on a change in pull request #9876: [FLINK-14134][table] 
Introduce LimitableTableSource for optimizing limit
URL: https://github.com/apache/flink/pull/9876#discussion_r338395645
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LimitableTableSource.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * Adds support for limiting push-down to a {@link TableSource}.
+ * A {@link TableSource} extending this interface is able to limit the number 
of records.
+ */
+@Experimental
 
 Review comment:
   The new interface remains experimental, like `PartitionableTableSource`. At 
present, source / sink is under reconstruction, which may change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services