twalthr commented on code in PR #26374:
URL: https://github.com/apache/flink/pull/26374#discussion_r2018759020
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala:
##########
@@ -487,9 +487,11 @@ object FlinkStreamRuleSets {
)
/** RuleSet related to transpose watermark to be close to source */
Review Comment:
update the JavaDoc as well
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala:
##########
@@ -487,9 +487,11 @@ object FlinkStreamRuleSets {
)
/** RuleSet related to transpose watermark to be close to source */
- val WATERMARK_TRANSPOSE_RULES: RuleSet = RuleSets.ofList(
+ val CHANGELOG_NORMALIZE_TRANSPOSE_RULES: RuleSet = RuleSets.ofList(
WatermarkAssignerChangelogNormalizeTransposeRule.WITH_CALC,
- WatermarkAssignerChangelogNormalizeTransposeRule.WITHOUT_CALC
+ WatermarkAssignerChangelogNormalizeTransposeRule.WITHOUT_CALC,
+ // optimize ChangelogNormalize
Review Comment:
```suggestion
// reduce state size in ChangelogNormalize
```
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml:
##########
@@ -253,6 +273,26 @@
LogicalSink(table=[default_catalog.default_database.upsert_sink_table], fields=[
<![CDATA[
Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
+- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChangelogNormalizePlan[[20]
select_no_metadata_upsert_table_partial_deletes_metadata_no_pushdown_into_upsert_sink_table]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO upsert_sink_table SELECT id, col1, col2 FROM
upsert_table_partial_deletes_metadata_no_pushdown]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table],
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+ +- LogicalProject(id=[$0], col1=[$1], col2=[$2], offset=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes_metadata_no_pushdown, metadata=[offset]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[UA,PD])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_table_partial_deletes_metadata_no_pushdown, metadata=[offset]]],
fields=[id, col1, col2, offset], changelogMode=[UA,PD])
Review Comment:
Is the TableSourceScan never informed that offset is not required? Actually,
after optimization this list should be empty.
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml:
##########
@@ -388,7 +428,7 @@
LogicalSink(table=[default_catalog.default_database.upsert_sink_table], fields=[
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id,
col1, col2], changelogMode=[NONE])
-+- ChangelogNormalize(key=[id], condition=[>(col1, 2)],
changelogMode=[I,UB,UA,D])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)], changelogMode=[I,UA,D])
Review Comment:
how did this happen?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]