wuchong commented on a change in pull request #15559:
URL: https://github.com/apache/flink/pull/15559#discussion_r612295189



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
##########
@@ -41,12 +40,13 @@ import java.util.function.Supplier
 class FlinkLogicalDataStreamTableScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
+    hints: util.List[RelHint],
     table: RelOptTable)
-  extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), table)
+  extends TableScan(cluster, traitSet, hints, table)

Review comment:
       Should we add hints in `explainTerms` for this node? 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/logical/LogicalTableScan.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.rel.logical;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.schema.Table;
+
+import java.util.List;
+
+/**
+ * A <code>LogicalTableScan</code> reads all the rows from a {@link 
RelOptTable}.
+ *
+ * <p>This class is copied from Calcite because the {@link #explainTerms} 
should consider hints.
+ *
+ * <p>If the table is a <code>net.sf.saffron.ext.JdbcTable</code>, then this 
is literally possible.
+ * But for other kinds of tables, there may be many ways to read the data from 
the table. For some
+ * kinds of table, it may not even be possible to read all of the rows unless 
some narrowing
+ * constraint is applied.
+ *
+ * <p>In the example of the <code>net.sf.saffron.ext.ReflectSchema</code> 
schema,
+ *
+ * <blockquote>
+ *
+ * <pre>select from fields</pre>
+ *
+ * </blockquote>
+ *
+ * <p>cannot be implemented, but
+ *
+ * <blockquote>
+ *
+ * <pre>select from fields as f
+ * where f.getClass().getName().equals("java.lang.String")</pre>
+ *
+ * </blockquote>
+ *
+ * <p>can. It is the optimizer's responsibility to find these ways, by 
applying transformation
+ * rules.
+ *
+ * <p>Line 106: {@link #explainTerms} method should consider hints.

Review comment:
       Please add Calcite issue ID.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
##########
@@ -303,4 +305,81 @@ class TableSourceITCase extends BatchTestBase {
         row("6"))
     )
   }
+
+  @Test
+  def testTableHint(): Unit = {
+    tEnv.getConfig.getConfiguration.setBoolean(
+      TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
+    val resultPath = TEMPORARY_FOLDER.newFolder().getAbsolutePath
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MySink (
+         |  `a` INT,
+         |  `b` BIGINT,
+         |  `c` STRING
+         |) WITH (
+         |  'connector' = 'filesystem',
+         |  'format' = 'testcsv',
+         |  'path' = '$resultPath'
+         |)
+       """.stripMargin)
+
+    val stmtSet= tEnv.createStatementSet()
+    stmtSet.addInsertSql(
+      """
+        |insert into MySink select a,b,c from MyTable
+        |  /*+ OPTIONS('source.num-element-to-skip'='1') */
+        |""".stripMargin)
+    stmtSet.addInsertSql(
+      """
+        |insert into MySink select a,b,c from MyTable
+        |  /*+ OPTIONS('source.num-element-to-skip'='2') */
+        |""".stripMargin)
+    stmtSet.execute().await()
+
+    val result = TableTestUtil.readFromFile(resultPath)
+    val expected = Seq("2,2,Hello", "3,2,Hello world", "3,2,Hello world")
+    Assert.assertEquals(expected.sorted, result.sorted)
+  }
+
+  @Test
+  def testTableHintWithLogicalTableScanReuse(): Unit = {

Review comment:
       From the method name, it's hard to distinguish what's the difference 
between it and `testTableHint`. Is it used to verify the fixing of 
`LogicalTableScan`?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
##########
@@ -144,4 +157,72 @@ class TableSourceTest extends TableTestBase {
          |""".stripMargin
     )
   }
+
+  @Test
+  def testTableHint(): Unit = {
+    util.tableEnv.getConfig.getConfiguration.setBoolean(
+      TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
+    util.tableEnv.executeSql(
+      s"""
+         |CREATE TABLE MySink (
+         |  `a` INT,
+         |  `b` BIGINT,
+         |  `c` STRING
+         |) WITH (
+         |  'connector' = 'filesystem',
+         |  'format' = 'testcsv',
+         |  'path' = '/tmp/test'
+         |)
+       """.stripMargin)
+
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(
+      """
+        |insert into MySink select a,b,c from MyTable
+        |  /*+ OPTIONS('source.num-element-to-skip'='1') */
+        |""".stripMargin)
+    stmtSet.addInsertSql(
+      """
+        |insert into MySink select a,b,c from MyTable
+        |  /*+ OPTIONS('source.num-element-to-skip'='2') */
+        |""".stripMargin)
+
+    util.verifyExecPlan(stmtSet)
+  }
+
+  @Test
+  def testTableHintWithLogicalTableScanReuse(): Unit = {
+    util.tableEnv.getConfig.getConfiguration.setBoolean(
+      TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)
+    util.tableEnv.getConfig.getConfiguration.setBoolean(
+      
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
 true)
+    util.tableEnv.executeSql(
+      s"""
+         |CREATE TABLE MySink (
+         |  `a` INT,
+         |  `b` BIGINT,
+         |  `c` STRING
+         |) WITH (
+         |  'connector' = 'filesystem',
+         |  'format' = 'testcsv',
+         |  'path' = '/tmp/test'
+         |)
+       """.stripMargin)
+
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(
+      """
+        |insert into MySink
+        |select a,b,c from MyTable /*+ 
OPTIONS('source.num-element-to-skip'='0') */
+        |union all
+        |select a,b,c from MyTable /*+ 
OPTIONS('source.num-element-to-skip'='1') */
+        |""".stripMargin)
+    stmtSet.addInsertSql(
+      """
+        |insert into MySink select a,b,c from MyTable
+        |  /*+ OPTIONS('source.num-element-to-skip'='2') */

Review comment:
       Could we add a test which source can still be reused with same options?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to