[
https://issues.apache.org/jira/browse/FLINK-38489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xuyang Zhong updated FLINK-38489:
---------------------------------
Description:
Currently, there is a slight flaw in the changelog mode inference for
ChangelogNormalize. If there is no filter in ChangelogNormalize and the input
consists only of I + UA, it should infer either I + UA or I + UA + UB.
Take the following case for example:
{code:java}
@Test
def test(): Unit = {
util.addTable(s"""
|CREATE TABLE src (
| a int primary key not enforced,
| b string
|) WITH (
| 'connector' = 'values',
| 'changelog-mode' = 'I,UA'
|)
|""".stripMargin)
util.addTable(s"""
|CREATE TABLE snk (
| a int primary key not enforced,
| b string
|) WITH (
| 'connector' = 'values',
| 'sink-insert-only' = 'false',
| 'sink-changelog-mode-enforced' = 'I,UA,D'
|)
|""".stripMargin)
util.verifyRelPlanInsert("insert into snk select * from src",
ExplainDetail.CHANGELOG_MODE)
} {code}
{code:java}
Sink(table=[default_catalog.default_database.snk], fields=[a, b],
changelogMode=[NONE])
+- ChangelogNormalize(key=[a], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[a]], changelogMode=[I,UA])
+- TableSourceScan(table=[[default_catalog, default_database, src]],
fields=[a, b], changelogMode=[I,UA])
{code}
was:
Currently, there is a slight flaw in the changelog mode inference for
ChangelogNormalize. If there is no filter in ChangelogNormalize and the input
consists only of I + UA, it should infer either I + UA or I + UA + UB.
Take the following case for example:
{code:java}
@Test
def test(): Unit = {
util.addTable(s"""
|CREATE TABLE src (
| a int primary key not enforced,
| b string
|) WITH (
| 'connector' = 'values',
| 'changelog-mode' = 'I,UA'
|)
|""".stripMargin)
util.addTable(s"""
|CREATE TABLE snk (
| a int primary key not enforced,
| b string
|) WITH (
| 'connector' = 'values',
| 'sink-insert-only' = 'false',
| 'sink-changelog-mode-enforced' = 'I,UA,D'
|)
|""".stripMargin)
util.verifyRelPlanInsert("insert into snk select * from src",
ExplainDetail.CHANGELOG_MODE)
} {code}
{code:java}
Sink(table=[default_catalog.default_database.snk], fields=[a, b],
changelogMode=[NONE]) +- ChangelogNormalize(key=[a], changelogMode=[I,UA,D]) +-
Exchange(distribution=[hash[a]], changelogMode=[I,UA]) +-
TableSourceScan(table=[[default_catalog, default_database, src]], fields=[a,
b], changelogMode=[I,UA])
{code}
> ChangelogNormalize can generate changelog mode without D
> ---------------------------------------------------------
>
> Key: FLINK-38489
> URL: https://issues.apache.org/jira/browse/FLINK-38489
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 2.1.0, 2.0.1
> Reporter: Xuyang Zhong
> Priority: Minor
>
> Currently, there is a slight flaw in the changelog mode inference for
> ChangelogNormalize. If there is no filter in ChangelogNormalize and the input
> consists only of I + UA, it should infer either I + UA or I + UA + UB.
> Take the following case for example:
> {code:java}
> @Test
> def test(): Unit = {
> util.addTable(s"""
> |CREATE TABLE src (
> | a int primary key not enforced,
> | b string
> |) WITH (
> | 'connector' = 'values',
> | 'changelog-mode' = 'I,UA'
> |)
> |""".stripMargin)
> util.addTable(s"""
> |CREATE TABLE snk (
> | a int primary key not enforced,
> | b string
> |) WITH (
> | 'connector' = 'values',
> | 'sink-insert-only' = 'false',
> | 'sink-changelog-mode-enforced' = 'I,UA,D'
> |)
> |""".stripMargin)
> util.verifyRelPlanInsert("insert into snk select * from src",
> ExplainDetail.CHANGELOG_MODE)
> } {code}
> {code:java}
> Sink(table=[default_catalog.default_database.snk], fields=[a, b],
> changelogMode=[NONE])
> +- ChangelogNormalize(key=[a], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[a]], changelogMode=[I,UA])
> +- TableSourceScan(table=[[default_catalog, default_database, src]],
> fields=[a, b], changelogMode=[I,UA])
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)