[ 
https://issues.apache.org/jira/browse/FLINK-34666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827333#comment-17827333
 ] 

Hongshun Wang commented on FLINK-34666:
---------------------------------------

Please assign it to me.

> Keep assigned splits in order to fix wrong meta group calculation
> -----------------------------------------------------------------
>
>                 Key: FLINK-34666
>                 URL: https://issues.apache.org/jira/browse/FLINK-34666
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Hongshun Wang
>            Priority: Major
>             Fix For: cdc-3.1.0
>
>
> h3. Reason
> When added newly tables, and then restart job, 
> IncrementalSourceEnumerator#sendStreamMetaRequestEvent -> 
> SplitAssigner#getFinishedSplitInfos maybe return unordered  
> finishedSplitInfos (newly snapshot infos is ahead of older one). When Reader 
> request newly table's infos, will get older one, then never read all the 
> infos and restart changelog read.
>  
> h3. How to reproduced it?
> Add  chunk-meta.group.size = 2 in 
> getCreateTableStatement, then run test of 
> org.apache.flink.cdc.connectors.postgres.source.NewlyAddedTableITCase#testNewlyAddedTableForExistsPipelineTwiceWithAheadWalLog
> {code:java}
> //代码占位符
> private String getCreateTableStatement(
>         Map<String, String> otherOptions, String... captureTableNames) {
>     return String.format(
>             "CREATE TABLE address ("
>                     + " table_name STRING METADATA VIRTUAL,"
>                     + " id BIGINT NOT NULL,"
>                     + " country STRING,"
>                     + " city STRING,"
>                     + " detail_address STRING,"
>                     + " primary key (id) not enforced"
>                     + ") WITH ("
>                     + " 'connector' = 'postgres-cdc',"
>                     + " 'scan.incremental.snapshot.enabled' = 'true',"
>                     + " 'hostname' = '%s',"
>                     + " 'port' = '%s',"
>                     + " 'username' = '%s',"
>                     + " 'password' = '%s',"
>                     + " 'database-name' = '%s',"
>                     + " 'schema-name' = '%s',"
>                     + " 'table-name' = '%s',"
>                     + " 'slot.name' = '%s', "
>                     + " 'scan.incremental.snapshot.chunk.size' = '2',"
>                     + " 'chunk-meta.group.size' = '2',"
>                     + " 'scan.newly-added-table.enabled' = 'true'"
>                     + " %s"
>                     + ")",
>             customDatabase.getHost(),
>             customDatabase.getDatabasePort(),
>             customDatabase.getUsername(),
>             customDatabase.getPassword(),
>             customDatabase.getDatabaseName(),
>             SCHEMA_NAME,
>             PostgresTestUtils.getTableNameRegex(captureTableNames),
>             slotName,
>             otherOptions.isEmpty()
>                     ? ""
>                     : ","
>                             + otherOptions.entrySet().stream()
>                                     .map(
>                                             e ->
>                                                     String.format(
>                                                             "'%s'='%s'",
>                                                             e.getKey(), 
> e.getValue()))
>                                     .collect(Collectors.joining(",")));
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to