Leonard Xu created FLINK-19878:
----------------------------------
Summary: Improve watermark ChangelogNormalize for upsertSource
Key: FLINK-19878
URL: https://issues.apache.org/jira/browse/FLINK-19878
Project: Flink
Issue Type: Sub-task
Components: Table SQL / Planner
Reporter: Leonard Xu
Cutrrently, for a upsertSource like upsert-kafka, the WatermarkAssigner is
followed after ChangelogNormalize Node, it may returns Long.MaxValue as
watermark if some parallelism doesn't have data.
As an improvement, we can move the WatermarkAssigner to be after the SourceCan
Node and thus the watermark will produce like general Source.
{code:java}
+- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime],
changelogMode=[I,UA,D])
+- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
+- TableSourceScan(table=[[default_catalog, default_database,
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)