xuyangzhong commented on code in PR #27107:
URL: https://github.com/apache/flink/pull/27107#discussion_r2430911342
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java:
##########
@@ -153,14 +150,25 @@ public void testSinkReuseFromSameSource() {
}
@Test
- public void testSinkReuseWithPartialColumns() {
+ public void
testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting() {
StatementSet statementSet = util.tableEnv().createStatementSet();
+ /// sink1 has not implemented the {@link SupportsTargetColumnWriting}
sink ability
Review Comment:
nit use `//` instead of using `///`
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java:
##########
@@ -153,14 +150,25 @@ public void testSinkReuseFromSameSource() {
}
@Test
- public void testSinkReuseWithPartialColumns() {
+ public void
testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting() {
StatementSet statementSet = util.tableEnv().createStatementSet();
+ /// sink1 has not implemented the {@link SupportsTargetColumnWriting}
sink ability
statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM
source1)");
statementSet.addInsertSql("INSERT INTO sink1(`y`) (SELECT y FROM
source1)");
statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM
source3)");
util.verifyExecPlan(statementSet);
}
+ @Test
+ public void
testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting() {
+ StatementSet statementSet = util.tableEnv().createStatementSet();
+ /// sink2 has implemented the {@link SupportsTargetColumnWriting} sink
ability
Review Comment:
ditto
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml:
##########
@@ -160,6 +160,35 @@
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[1]],
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
LogicalSink(table=[default_catalog.default_database.sink1],
targetColumns=[[0]], fields=[x, EXPR$1])
++- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source3]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]],
fields=[x, EXPR$1])
Review Comment:
The plan is a bit strange with the description `targetColumns=[[0]]` in
Sink.
I think we can either:
1. In the PhysicalSink, check whether SupportsTargetColumnWriting is present
before printing targetColumns, and avoid printing targetColumns if it's not.
2. In the PhysicalSink, reference SupportsTargetColumnWriting when printing
targetColumns, regardless of the targetColumns themselves.
3. Complete the printing of targetColumns based on
https://issues.apache.org/jira/browse/FLINK-37708.
As of now, since targetColumns has not been removed from PhysicalSink, I
tend to prefer option 1. What do you think, or do you have any other
suggestions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]