[
https://issues.apache.org/jira/browse/FLINK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15739102#comment-15739102
]
ASF GitHub Bot commented on FLINK-5220:
---------------------------------------
Github user beyond1920 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2923#discussion_r91847990
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
TypeInformation}
+import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment
=> JavaExecEnv}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.sources.{BatchTableSource,
ProjectableTableSource}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ProjectableTableSourceITCase(mode: TestExecutionMode,
+ configMode: TableConfigMode)
+ extends TableProgramsTestBase(mode, configMode) {
+
+ private val tableName = "MyTable"
+ private var tableEnv: BatchTableEnvironment = null
+
+ @Before
+ def initTableEnv(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ tableEnv = TableEnvironment.getTableEnvironment(env, config)
+ tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
+ }
+
+ @Test
+ def testTableAPI(): Unit = {
+ val results = tableEnv
+ .scan(tableName)
+ .where("amount < 4")
+ .select("id, name")
+ .collect()
+
+ val expected = Seq(
+ "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3",
"16,Record_16",
+ "17,Record_17", "18,Record_18", "19,Record_19",
"32,Record_32").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+
+ @Test
+ def testSQL(): Unit = {
+ val results = tableEnv
+ .sql(s"select id, name from $tableName where amount < 4 ")
+ .collect()
+
+ val expected = Seq(
+ "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3",
"16,Record_16",
+ "17,Record_17", "18,Record_18", "19,Record_19",
"32,Record_32").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+}
+
+class TestProjectableTableSource(
+ fieldTypes: Array[TypeInformation[_]],
+ fieldNames: Array[String])
+ extends BatchTableSource[Row] with ProjectableTableSource[Row] {
+
+ def this() = this(
+ fieldTypes = Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO),
+ fieldNames = Array[String]("name", "id", "amount", "price")
+ )
+
+ /** Returns the data of the table as a
[[org.apache.flink.api.java.DataSet]]. */
+ override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
+ execEnv.createInput(new ProjectableInputFormat(33, fieldNames),
getReturnType).setParallelism(1)
--- End diff --
yes, thanks
> Flink SQL projection pushdown
> -----------------------------
>
> Key: FLINK-5220
> URL: https://issues.apache.org/jira/browse/FLINK-5220
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: zhangjing
> Assignee: zhangjing
>
> The jira is to do projection pushdown optimization. Please go forward to the
> the design document for more details.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)