[ 
https://issues.apache.org/jira/browse/FLINK-38489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuyang Zhong updated FLINK-38489:
---------------------------------
    Description: 
Currently, there is a slight flaw in the changelog mode inference for 
ChangelogNormalize. If there is no filter in ChangelogNormalize and the input 
consists only of I + UA, it should infer either I + UA or I + UA + UB.

Take the following case for example:
{code:java}
@Test
def test(): Unit = {
  util.addTable(s"""
                   |CREATE TABLE src (
                   |  a int primary key not enforced,
                   |  b string
                   |) WITH (
                   |  'connector' = 'values',
                   |  'changelog-mode' = 'I,UA'
                   |)
                   |""".stripMargin)

  util.addTable(s"""
                   |CREATE TABLE snk (
                   |  a int primary key not enforced,
                   |  b string
                   |) WITH (
                   |  'connector' = 'values',
                   |  'sink-insert-only' = 'false',
                   |  'sink-changelog-mode-enforced' = 'I,UA,D'
                   |)
                   |""".stripMargin)

  util.verifyRelPlanInsert("insert into snk select * from src", 
ExplainDetail.CHANGELOG_MODE)
} {code}
{code:java}
Sink(table=[default_catalog.default_database.snk], fields=[a, b], 
changelogMode=[NONE]) 
+- ChangelogNormalize(key=[a], changelogMode=[I,UA,D])  # actually, this should 
be [I,UA]
  +- Exchange(distribution=[hash[a]], changelogMode=[I,UA]) 
    +- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[a, b], changelogMode=[I,UA])
{code}

  was:
Currently, there is a slight flaw in the changelog mode inference for 
ChangelogNormalize. If there is no filter in ChangelogNormalize and the input 
consists only of I + UA, it should infer either I + UA or I + UA + UB.

Take the following case for example:
{code:java}
@Test
def test(): Unit = {
  util.addTable(s"""
                   |CREATE TABLE src (
                   |  a int primary key not enforced,
                   |  b string
                   |) WITH (
                   |  'connector' = 'values',
                   |  'changelog-mode' = 'I,UA'
                   |)
                   |""".stripMargin)

  util.addTable(s"""
                   |CREATE TABLE snk (
                   |  a int primary key not enforced,
                   |  b string
                   |) WITH (
                   |  'connector' = 'values',
                   |  'sink-insert-only' = 'false',
                   |  'sink-changelog-mode-enforced' = 'I,UA,D'
                   |)
                   |""".stripMargin)

  util.verifyRelPlanInsert("insert into snk select * from src", 
ExplainDetail.CHANGELOG_MODE)
} {code}
{code:java}
Sink(table=[default_catalog.default_database.snk], fields=[a, b], 
changelogMode=[NONE]) 
+- ChangelogNormalize(key=[a], changelogMode=[I,UA,D]) 
  +- Exchange(distribution=[hash[a]], changelogMode=[I,UA]) 
    +- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[a, b], changelogMode=[I,UA])
{code}


> ChangelogNormalize can generate changelog mode without D 
> ---------------------------------------------------------
>
>                 Key: FLINK-38489
>                 URL: https://issues.apache.org/jira/browse/FLINK-38489
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0, 2.0.1
>            Reporter: Xuyang Zhong
>            Priority: Minor
>
> Currently, there is a slight flaw in the changelog mode inference for 
> ChangelogNormalize. If there is no filter in ChangelogNormalize and the input 
> consists only of I + UA, it should infer either I + UA or I + UA + UB.
> Take the following case for example:
> {code:java}
> @Test
> def test(): Unit = {
>   util.addTable(s"""
>                    |CREATE TABLE src (
>                    |  a int primary key not enforced,
>                    |  b string
>                    |) WITH (
>                    |  'connector' = 'values',
>                    |  'changelog-mode' = 'I,UA'
>                    |)
>                    |""".stripMargin)
>   util.addTable(s"""
>                    |CREATE TABLE snk (
>                    |  a int primary key not enforced,
>                    |  b string
>                    |) WITH (
>                    |  'connector' = 'values',
>                    |  'sink-insert-only' = 'false',
>                    |  'sink-changelog-mode-enforced' = 'I,UA,D'
>                    |)
>                    |""".stripMargin)
>   util.verifyRelPlanInsert("insert into snk select * from src", 
> ExplainDetail.CHANGELOG_MODE)
> } {code}
> {code:java}
> Sink(table=[default_catalog.default_database.snk], fields=[a, b], 
> changelogMode=[NONE]) 
> +- ChangelogNormalize(key=[a], changelogMode=[I,UA,D])  # actually, this 
> should be [I,UA]
>   +- Exchange(distribution=[hash[a]], changelogMode=[I,UA]) 
>     +- TableSourceScan(table=[[default_catalog, default_database, src]], 
> fields=[a, b], changelogMode=[I,UA])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to