xuyangzhong commented on code in PR #27107:
URL: https://github.com/apache/flink/pull/27107#discussion_r2430911342


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java:
##########
@@ -153,14 +150,25 @@ public void testSinkReuseFromSameSource() {
     }
 
     @Test
-    public void testSinkReuseWithPartialColumns() {
+    public void 
testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting() {
         StatementSet statementSet = util.tableEnv().createStatementSet();
+        /// sink1 has not implemented the {@link SupportsTargetColumnWriting} 
sink ability

Review Comment:
   nit use `//` instead of using `///`



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java:
##########
@@ -153,14 +150,25 @@ public void testSinkReuseFromSameSource() {
     }
 
     @Test
-    public void testSinkReuseWithPartialColumns() {
+    public void 
testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting() {
         StatementSet statementSet = util.tableEnv().createStatementSet();
+        /// sink1 has not implemented the {@link SupportsTargetColumnWriting} 
sink ability
         statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM 
source1)");
         statementSet.addInsertSql("INSERT INTO sink1(`y`) (SELECT y FROM 
source1)");
         statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM 
source3)");
         util.verifyExecPlan(statementSet);
     }
 
+    @Test
+    public void 
testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting() {
+        StatementSet statementSet = util.tableEnv().createStatementSet();
+        /// sink2 has implemented the {@link SupportsTargetColumnWriting} sink 
ability

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml:
##########
@@ -160,6 +160,35 @@ 
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[1]],
    +- LogicalTableScan(table=[[default_catalog, default_database, source1]])
 
 LogicalSink(table=[default_catalog.default_database.sink1], 
targetColumns=[[0]], fields=[x, EXPR$1])
++- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
+   +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], 
fields=[x, EXPR$1])

Review Comment:
   The plan is a bit strange with the description `targetColumns=[[0]]` in 
Sink. 
   
   I think we can either:
   1. In the PhysicalSink, check whether SupportsTargetColumnWriting is present 
before printing targetColumns, and avoid printing targetColumns if it's not.
   2. In the PhysicalSink, reference SupportsTargetColumnWriting when printing 
targetColumns, regardless of the targetColumns themselves.
   3. Complete the printing of targetColumns based on 
https://issues.apache.org/jira/browse/FLINK-37708.
   
   As of now, since targetColumns has not been removed from PhysicalSink, I 
tend to prefer option 1. What do you think, or do you have any other 
suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to