[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745173#comment-15745173
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2923


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745040#comment-15745040
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2923
  
Merging


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15742135#comment-15742135
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2923
  
Thanks for the update @beyond1920.
The PR is good to merge :-)


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15739102#comment-15739102
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91847990
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment 
=> JavaExecEnv}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ProjectableTableSourceITCase(mode: TestExecutionMode,
+  configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  private val tableName = "MyTable"
+  private var tableEnv: BatchTableEnvironment = null
+
+  @Before
+  def initTableEnv(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+tableEnv = TableEnvironment.getTableEnvironment(env, config)
+tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
+  }
+
+  @Test
+  def testTableAPI(): Unit = {
+val results = tableEnv
+  .scan(tableName)
+  .where("amount < 4")
+  .select("id, name")
+  .collect()
+
+val expected = Seq(
+  "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", 
"16,Record_16",
+  "17,Record_17", "18,Record_18", "19,Record_19", 
"32,Record_32").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test
+  def testSQL(): Unit = {
+val results = tableEnv
+  .sql(s"select id, name from $tableName where amount < 4 ")
+  .collect()
+
+val expected = Seq(
+  "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", 
"16,Record_16",
+  "17,Record_17", "18,Record_18", "19,Record_19", 
"32,Record_32").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+}
+
+class TestProjectableTableSource(
+  fieldTypes: Array[TypeInformation[_]],
+  fieldNames: Array[String])
+  extends BatchTableSource[Row] with ProjectableTableSource[Row] {
+
+  def this() = this(
+fieldTypes = Array(
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.DOUBLE_TYPE_INFO),
+fieldNames = Array[String]("name", "id", "amount", "price")
+  )
+
+  /** Returns the data of the table as a 
[[org.apache.flink.api.java.DataSet]]. */
+  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
+execEnv.createInput(new ProjectableInputFormat(33, fieldNames), 
getReturnType).setParallelism(1)
--- End diff --

yes, thanks


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
>

[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15739101#comment-15739101
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91847985
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, 
VARCHAR}
+import org.apache.calcite.rex.{RexBuilder, RexInputRef, RexProgram, 
RexProgramBuilder}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+
+import scala.collection.JavaConverters._
+import 
org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
+import org.junit.{Assert, Before, Test}
+
+/**
+  * This class is responsible for testing RexProgramProjectExtractor
+  */
+class RexProgramProjectExtractorTest {
+  private var typeFactory: JavaTypeFactory = null
+  private var rexBuilder: RexBuilder = null
+  private var allFieldTypes: Seq[RelDataType] = null
+  private val allFieldNames = List("name", "id", "amount", "price")
+
+  @Before
+  def setUp: Unit = {
+typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
+rexBuilder = new RexBuilder(typeFactory)
+allFieldTypes = List(VARCHAR, BIGINT, INTEGER, 
DOUBLE).map(typeFactory.createSqlType(_))
+  }
+
+  @Test
+  def testExtractRefInputFields: Unit = {
+val usedFields = extractRefInputFields(buildRexProgram)
+Assert.assertArrayEquals(usedFields, Array(2, 1, 3))
+  }
+
+  @Test
+  def testRewriteRexProgram: Unit = {
+val originRexProgram = buildRexProgram
+
Assert.assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
+  "$0",
+  "$1",
+  "$2",
+  "$3",
+  "*($t2, $t3)",
+  "100",
+  "<($t4, $t5)",
+  "6",
+  ">($t2, $t7)",
+  "AND($t6, $t8)")))
+// use amount, id, price fields to create a new RexProgram
+val usedFields = Array(2, 1, 3)
+val types = usedFields.map(allFieldTypes(_)).toList.asJava
+val names = usedFields.map(allFieldNames(_)).toList.asJava
+val inputRowType = typeFactory.createStructType(types, names)
+val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, 
usedFields, rexBuilder)
+Assert.assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
+  "$0",
+  "$1",
+  "$2",
+  "*($t0, $t2)",
+  "100",
+  "<($t3, $t4)",
+  "6",
+  ">($t0, $t6)",
+  "AND($t5, $t7)")))
+  }
+
+  private def buildRexProgram: RexProgram = {
+val types = allFieldTypes.asJava
+val names = allFieldNames.asJava
+val inputRowType = typeFactory.createStructType(types, names)
+val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+val t0 = rexBuilder.makeInputRef(types.get(2), 2)
+val t1 = rexBuilder.makeInputRef(types.get(1), 1)
+val t2 = rexBuilder.makeInputRef(types.get(3), 3)
+val t3 = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
+val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
+// project: amount, id, amount * price
+builder.addProject(t0, "amount")
+builder.addProject(t1, "id")
+builder.addProject(t3, "total")
+// 

[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735287#comment-15735287
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91706582
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
 ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.sources
+
+/**
+  * Defines TableSource which supports project pushdown.
+  * E.g A definition of TestBatchTableSource which supports project
--- End diff --

I think test classes should not be referenced from the documentation of an 
API class.


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735290#comment-15735290
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91709480
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment 
=> JavaExecEnv}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ProjectableTableSourceITCase(mode: TestExecutionMode,
+  configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  private val tableName = "MyTable"
+  private var tableEnv: BatchTableEnvironment = null
+
+  @Before
+  def initTableEnv(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+tableEnv = TableEnvironment.getTableEnvironment(env, config)
+tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
+  }
+
+  @Test
+  def testTableAPI(): Unit = {
+val results = tableEnv
+  .scan(tableName)
+  .where("amount < 4")
+  .select("id, name")
+  .collect()
+
+val expected = Seq(
+  "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", 
"16,Record_16",
+  "17,Record_17", "18,Record_18", "19,Record_19", 
"32,Record_32").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test
+  def testSQL(): Unit = {
+val results = tableEnv
+  .sql(s"select id, name from $tableName where amount < 4 ")
+  .collect()
+
+val expected = Seq(
+  "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", 
"16,Record_16",
+  "17,Record_17", "18,Record_18", "19,Record_19", 
"32,Record_32").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+}
+
+class TestProjectableTableSource(
+  fieldTypes: Array[TypeInformation[_]],
+  fieldNames: Array[String])
+  extends BatchTableSource[Row] with ProjectableTableSource[Row] {
+
+  def this() = this(
+fieldTypes = Array(
+  BasicTypeInfo.STRING_TYPE_INFO,
+  BasicTypeInfo.LONG_TYPE_INFO,
+  BasicTypeInfo.INT_TYPE_INFO,
+  BasicTypeInfo.DOUBLE_TYPE_INFO),
+fieldNames = Array[String]("name", "id", "amount", "price")
+  )
+
+  /** Returns the data of the table as a 
[[org.apache.flink.api.java.DataSet]]. */
+  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
+execEnv.createInput(new ProjectableInputFormat(33, fieldNames), 
getReturnType).setParallelism(1)
--- End diff --

I think the test can be simplified if we use `execEnv.fromCollection` and 
build the records in the collection depending on the 

[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735289#comment-15735289
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91713315
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
+import 
org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+/**
+  * This rule is responsible for push project into BatchTableSourceScan 
node
+  */
+class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+scan.tableSource match {
+  case _: ProjectableTableSource[_] => true
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
+
+// if no fields can be projected, there is no need to transform subtree
+scan.tableSource.getNumberOfFields match {
--- End diff --

Can you convert this into a simple `if` condition?


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735288#comment-15735288
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91704173
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
+import 
org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+
+/**
+  * This rule is responsible for push project into BatchTableSourceScan 
node
+  */
+class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+scan.tableSource match {
+  case _: ProjectableTableSource[_] => true
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
+
+// if no fields can be projected, there is no need to transform subtree
+scan.tableSource.getNumberOfFields match {
+  case fieldNums if fieldNums == usedFields.length  =>
+  case _ =>
+val originTableSource = 
scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+val newTableSource = originTableSource.projectFields(usedFields)
+val newScan = new BatchTableSourceScan(
+  scan.getCluster,
+  scan.getTraitSet,
+  scan.getTable,
+  newTableSource.asInstanceOf[BatchTableSource[_]])
+
+val newCalcProgram = rewriteRexProgram(
+  calc.calcProgram,
+  newScan.getRowType,
+  usedFields,
+  calc.getCluster.getRexBuilder)
+
+// if project merely returns its input and doesn't exist filter, 
remove datasetCalc nodes
+if (newCalcProgram.isTrivial) {
+  call.transformTo(newScan)
+} else {
+  val newCal = new DataSetCalc(calc.getCluster,
--- End diff --

`newCal` -> `newCalc`


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15735286#comment-15735286
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91711313
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/util/RexProgramProjectExtractorTest.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.util
+
+import java.math.BigDecimal
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, DOUBLE, INTEGER, 
VARCHAR}
+import org.apache.calcite.rex.{RexBuilder, RexInputRef, RexProgram, 
RexProgramBuilder}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+
+import scala.collection.JavaConverters._
+import 
org.apache.flink.api.table.plan.rules.util.RexProgramProjectExtractor._
+import org.junit.{Assert, Before, Test}
+
+/**
+  * This class is responsible for testing RexProgramProjectExtractor
+  */
+class RexProgramProjectExtractorTest {
+  private var typeFactory: JavaTypeFactory = null
+  private var rexBuilder: RexBuilder = null
+  private var allFieldTypes: Seq[RelDataType] = null
+  private val allFieldNames = List("name", "id", "amount", "price")
+
+  @Before
+  def setUp: Unit = {
+typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
+rexBuilder = new RexBuilder(typeFactory)
+allFieldTypes = List(VARCHAR, BIGINT, INTEGER, 
DOUBLE).map(typeFactory.createSqlType(_))
+  }
+
+  @Test
+  def testExtractRefInputFields: Unit = {
+val usedFields = extractRefInputFields(buildRexProgram)
+Assert.assertArrayEquals(usedFields, Array(2, 1, 3))
+  }
+
+  @Test
+  def testRewriteRexProgram: Unit = {
+val originRexProgram = buildRexProgram
+
Assert.assertTrue(extractExprStrList(originRexProgram).sameElements(Array(
+  "$0",
+  "$1",
+  "$2",
+  "$3",
+  "*($t2, $t3)",
+  "100",
+  "<($t4, $t5)",
+  "6",
+  ">($t2, $t7)",
+  "AND($t6, $t8)")))
+// use amount, id, price fields to create a new RexProgram
+val usedFields = Array(2, 1, 3)
+val types = usedFields.map(allFieldTypes(_)).toList.asJava
+val names = usedFields.map(allFieldNames(_)).toList.asJava
+val inputRowType = typeFactory.createStructType(types, names)
+val newRexProgram = rewriteRexProgram(originRexProgram, inputRowType, 
usedFields, rexBuilder)
+Assert.assertTrue(extractExprStrList(newRexProgram).sameElements(Array(
+  "$0",
+  "$1",
+  "$2",
+  "*($t0, $t2)",
+  "100",
+  "<($t3, $t4)",
+  "6",
+  ">($t0, $t6)",
+  "AND($t5, $t7)")))
+  }
+
+  private def buildRexProgram: RexProgram = {
+val types = allFieldTypes.asJava
+val names = allFieldNames.asJava
+val inputRowType = typeFactory.createStructType(types, names)
+val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+val t0 = rexBuilder.makeInputRef(types.get(2), 2)
+val t1 = rexBuilder.makeInputRef(types.get(1), 1)
+val t2 = rexBuilder.makeInputRef(types.get(3), 3)
+val t3 = 
builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, t0, t2))
+val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+val t5 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(6L))
+// project: amount, id, amount * price
+builder.addProject(t0, "amount")
+builder.addProject(t1, "id")
+builder.addProject(t3, "total")
+// 

[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15734591#comment-15734591
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user tonycox commented on the issue:

https://github.com/apache/flink/pull/2923
  
Hi @fhueske, this PR looks good to me


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15730815#comment-15730815
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91429660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
 ---
@@ -45,7 +45,8 @@ abstract class BatchScan(
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
 
 val rowCnt = metadata.getRowCount(this)
-planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+val columnCnt = getRowType.getFieldCount
--- End diff --

Yes, that's right. Thanks.


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15730809#comment-15730809
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2923
  
@fhueske , thanks for your review. I have already modified the pr based on 
your advice, including following main changes:
1. Adapt the cost model for the BatchTableSourceScan
2. Modify DataSetCalcConverter -> RexProgramProjectExtractor, and add a 
test case
Thanks.


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725829#comment-15725829
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2923
  
Hi @beyond1920 and @KurtYoung,

please see my 
[comment](https://github.com/apache/flink/pull/2810#issuecomment-265180294) on 
PR #2810 for how to continue with this PR.
Please let me know what you think.

Best, Fabian


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725760#comment-15725760
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91083093
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.rex.{RexProgram, RexUtil}
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
+import org.apache.flink.api.table.plan.rules.util.DataSetCalcConverter._
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+import scala.collection.JavaConverters._
+
+/**
+  * This rule is responsible for push project into BatchTableSourceScan 
node
+  */
+class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+scan.tableSource match {
+  case _: ProjectableTableSource[_] => true
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+val usedFields: Array[Int] = extractRefInputFields(calc)
+
+// if no fields can be projected, there is no need to transform subtree
+if (scan.tableSource.getNumberOfFields == usedFields.length) {
+  return
+}
+
+val originTableSource = 
scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val newTableSource = originTableSource.projectFields(usedFields)
+
+val newScan = new BatchTableSourceScan(
+  scan.getCluster,
+  scan.getTraitSet,
+  scan.getTable,
+  newTableSource.asInstanceOf[BatchTableSource[_]])
+
+val (newProjectExprs, newConditionExpr) = rewriteCalcExprs(calc, 
usedFields)
+
+// if project merely returns its input and doesn't exist filter, 
remove datasetCalc nodes
+val newProjectExprsList = newProjectExprs.asJava
+if (RexUtil.isIdentity(newProjectExprsList, newScan.getRowType)
--- End diff --

Can be checked as `RexProgram.isTrivial()`


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725756#comment-15725756
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91082726
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.rex.{RexProgram, RexUtil}
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
+import org.apache.flink.api.table.plan.rules.util.DataSetCalcConverter._
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+import scala.collection.JavaConverters._
+
+/**
+  * This rule is responsible for push project into BatchTableSourceScan 
node
+  */
+class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+scan.tableSource match {
+  case _: ProjectableTableSource[_] => true
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+val usedFields: Array[Int] = extractRefInputFields(calc)
+
+// if no fields can be projected, there is no need to transform subtree
+if (scan.tableSource.getNumberOfFields == usedFields.length) {
--- End diff --

Move this check to `matches`


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725762#comment-15725762
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91083315
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/DataSetCalcConverter.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.util
+
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexLocalRef, RexNode, 
RexShuttle, RexSlot, RexVisitorImpl}
+import org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+object DataSetCalcConverter {
+
+  /**
+* extract used input fields index of DataSetCalc RelNode
+*
+* @param calc the DataSetCalc which to analyze
+* @return used input fields indices
+*/
+  def extractRefInputFields(calc: DataSetCalc): Array[Int] = {
+val visitor = new RefFieldsVisitor
+val calcProgram = calc.calcProgram
+// extract input fields from project expressions
+calcProgram.getProjectList.foreach(exp => 
calcProgram.expandLocalRef(exp).accept(visitor))
+val condition = calcProgram.getCondition
+// extract input fields from condition expression
+if (condition != null) {
+  calcProgram.expandLocalRef(condition).accept(visitor)
+}
+visitor.getFields
+  }
+
+  /**
+* rewrite DataSetCal project expressions and condition expression 
based on new input fields
+*
+* @param calcthe DataSetCalc which to rewrite
+* @param usedInputFields input fields index of DataSetCalc RelNode
+* @return a tuple which contain 2 elements, the first one is rewritten 
project expressions;
+* the second one is rewritten condition expression,
+* Note: if origin condition expression is null, the second 
value is None
+*/
+  def rewriteCalcExprs(
--- End diff --

Produce a `RexProgram` instead of `(List[RexNode], Option[RexNode])`


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725758#comment-15725758
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91088241
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/DataSetCalcConverter.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.util
+
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexLocalRef, RexNode, 
RexShuttle, RexSlot, RexVisitorImpl}
+import org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+object DataSetCalcConverter {
--- End diff --

Can we make this class `DataSetCalc` independent and only work on 
`RexProgram`? Then it can be reused for streaming as well. It could be called 
`RexProgramProjectExtractor`.


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725759#comment-15725759
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91084549
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanITCase.scala
 ---
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import collection.JavaConversions._
+import org.apache.calcite.rel.{RelNode, RelVisitor}
+import org.apache.calcite.rel.core.TableScan
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.table.BatchTableEnvironment
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.{Assert, Before, Ignore, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.mutable
+
+/**
+  * Test push project down to batchTableSourceScan optimization
+  *
+  * @param mode
+  * @param configMode
+  */
+@RunWith(classOf[Parameterized])
+class PushProjectIntoBatchTableSourceScanITCase(mode: TestExecutionMode,
--- End diff --

These ITCases are very expensive and add significantly to the build time of 
Flink. We try to have as few ITCases as possible. 

Instead we have tests to check the optimized plan (see `TableTestBase`). 
With these tests we check that the projection is properly pushed into the table 
source for SQL and Table API. 

In addition, we need two ITCases, one for DataSet and one for DataStream, 
which check that the translation from `DataSetRel` (`DataStreamRel`) to 
`DataSet` and `DataStream` is working correctly.


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725757#comment-15725757
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91083035
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.rex.{RexProgram, RexUtil}
+import 
org.apache.flink.api.table.plan.nodes.dataset.{BatchTableSourceScan, 
DataSetCalc}
+import org.apache.flink.api.table.plan.rules.util.DataSetCalcConverter._
+import org.apache.flink.api.table.sources.{BatchTableSource, 
ProjectableTableSource}
+import scala.collection.JavaConverters._
+
+/**
+  * This rule is responsible for push project into BatchTableSourceScan 
node
+  */
+class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
+  operand(classOf[DataSetCalc],
+operand(classOf[BatchTableSourceScan], none)),
+  "PushProjectIntoBatchTableSourceScanRule") {
+
+  override def matches(call: RelOptRuleCall) = {
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+scan.tableSource match {
+  case _: ProjectableTableSource[_] => true
+  case _ => false
+}
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc]
+val scan: BatchTableSourceScan = 
call.rel(1).asInstanceOf[BatchTableSourceScan]
+
+val usedFields: Array[Int] = extractRefInputFields(calc)
+
+// if no fields can be projected, there is no need to transform subtree
+if (scan.tableSource.getNumberOfFields == usedFields.length) {
+  return
+}
+
+val originTableSource = 
scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
+
+val newTableSource = originTableSource.projectFields(usedFields)
+
+val newScan = new BatchTableSourceScan(
+  scan.getCluster,
+  scan.getTraitSet,
+  scan.getTable,
+  newTableSource.asInstanceOf[BatchTableSource[_]])
+
+val (newProjectExprs, newConditionExpr) = rewriteCalcExprs(calc, 
usedFields)
--- End diff --

I think it would be nicer to directly generate a new `RexProgram`


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725761#comment-15725761
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91083411
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/util/DataSetCalcConverter.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.util
+
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexLocalRef, RexNode, 
RexShuttle, RexSlot, RexVisitorImpl}
+import org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+object DataSetCalcConverter {
--- End diff --

This utility class should be thoroughly tested by unit tests.


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15725755#comment-15725755
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2923#discussion_r91081832
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
 ---
@@ -45,7 +45,8 @@ abstract class BatchScan(
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
 
 val rowCnt = metadata.getRowCount(this)
-planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+val columnCnt = getRowType.getFieldCount
--- End diff --

The first cost parameter is number of processed rows, second parameter 
indicates CPU cost, and third IO cost. I would not change the number of 
processed rows but add IO costs of `rowCnt * rowSize` where `rowSize` can be 
computed by `DataSetRel.estimateRowSize()`. 

Have a look at `DataSetCost` to check how costs are compared. The cost 
model aims at minimizing IO costs. So making here a difference results in the 
choice of the plan with the most projection.


> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-02 Thread zhangjing (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15717000#comment-15717000
 ] 

zhangjing commented on FLINK-5220:
--

Hi, Fabian. I understand, I would close this jira. Thanks.

> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-02 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15714939#comment-15714939
 ] 

Fabian Hueske commented on FLINK-5220:
--

I see. You might have noticed that there is already a PR for FLINK-3848 and 
some discussion in that issue.
Therefore, I would rather close this one as it is a duplicate and contains less 
information that FLINK-3848.

Having two PRs for the same issue is of course not a good situation and we try 
to avoid that by closing duplicate issues and assigning them as soon as people 
start working on in.
Now we need to see how to continue with both PRs...

> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-02 Thread zhangjing (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15714923#comment-15714923
 ] 

zhangjing commented on FLINK-5220:
--

Hi Fabian, this jira and FLINK-3848 both about projection pushdown. I would 
close the issue latter. 
Beside, I already open a pr about project pushdown optimization, your advice is 
welcome, thanks.

> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15714885#comment-15714885
 ] 

ASF GitHub Bot commented on FLINK-5220:
---

GitHub user beyond1920 opened a pull request:

https://github.com/apache/flink/pull/2923

[FLINK-5220] [Table API & SQL] Flink SQL projection pushdown

This pr aims to do projection pushdown optimization.
There are two commits here, first one is linked to 
[https://issues.apache.org/jira/browse/FLINK-5185](url), it is the pre work; 
second commit is merely about projection pushdown work. So it's maybe better to 
start with the second commit.
The main changes including:
1. add  PushProjectIntoBatchTableSourceScanRule to match 
DataSetCalc->BatchTableSourceScan
2. add ProjectableTableSource to represent a TableSource which supports 
Projection pushdown
3. change BatchScan cost compute logic
4. add a test case


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink jira-5220

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2923


commit 6f4ecf2efaf424505e05c2d9142c90da24e12ed1
Author: beyond1920 
Date:   2016-11-29T04:26:02Z

Decouple BatchTableSourceScan with TableSourceTable
modify constructor of BatchScan, BatchTableSourceScan, DataSetScan

Test Plan: junit

Reviewers: kete.yangkt

Differential Revision: http://phabricator.taobao.net/D6601

modify code style and extract common method

let rule decide which tableSource to create a BatchTableScan

Decouple BatchTableSourceScan with TableSourceTable

make long length shorter to pass the flink code style check

commit 181f7f7d4362799549f9ad3e7da2e69838c0f834
Author: beyond1920 
Date:   2016-12-02T03:33:12Z

push project down into BatchTableSourceScan




> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5220) Flink SQL projection pushdown

2016-12-02 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15714560#comment-15714560
 ] 

Fabian Hueske commented on FLINK-5220:
--

@zhangjing Can you explain how this issue is related to FLINK-3848? 
If it is a duplicate, I would suggest to close this issue and continue the 
discussion at FLINK-3848.
If it extends FLINK-3848 it would be good to point out where the issues differ.

Thanks, Fabian

> Flink SQL projection pushdown
> -
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: zhangjing
>Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the 
> the design document for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)