[ https://issues.apache.org/jira/browse/FLINK-34666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827333#comment-17827333 ]
Hongshun Wang commented on FLINK-34666: --------------------------------------- Please assign it to me. > Keep assigned splits in order to fix wrong meta group calculation > ----------------------------------------------------------------- > > Key: FLINK-34666 > URL: https://issues.apache.org/jira/browse/FLINK-34666 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Reporter: Hongshun Wang > Priority: Major > Fix For: cdc-3.1.0 > > > h3. Reason > When added newly tables, and then restart job, > IncrementalSourceEnumerator#sendStreamMetaRequestEvent -> > SplitAssigner#getFinishedSplitInfos maybe return unordered > finishedSplitInfos (newly snapshot infos is ahead of older one). When Reader > request newly table's infos, will get older one, then never read all the > infos and restart changelog read. > > h3. How to reproduced it? > Add chunk-meta.group.size = 2 in > getCreateTableStatement, then run test of > org.apache.flink.cdc.connectors.postgres.source.NewlyAddedTableITCase#testNewlyAddedTableForExistsPipelineTwiceWithAheadWalLog > {code:java} > //代码占位符 > private String getCreateTableStatement( > Map<String, String> otherOptions, String... captureTableNames) { > return String.format( > "CREATE TABLE address (" > + " table_name STRING METADATA VIRTUAL," > + " id BIGINT NOT NULL," > + " country STRING," > + " city STRING," > + " detail_address STRING," > + " primary key (id) not enforced" > + ") WITH (" > + " 'connector' = 'postgres-cdc'," > + " 'scan.incremental.snapshot.enabled' = 'true'," > + " 'hostname' = '%s'," > + " 'port' = '%s'," > + " 'username' = '%s'," > + " 'password' = '%s'," > + " 'database-name' = '%s'," > + " 'schema-name' = '%s'," > + " 'table-name' = '%s'," > + " 'slot.name' = '%s', " > + " 'scan.incremental.snapshot.chunk.size' = '2'," > + " 'chunk-meta.group.size' = '2'," > + " 'scan.newly-added-table.enabled' = 'true'" > + " %s" > + ")", > customDatabase.getHost(), > customDatabase.getDatabasePort(), > customDatabase.getUsername(), > customDatabase.getPassword(), > customDatabase.getDatabaseName(), > SCHEMA_NAME, > PostgresTestUtils.getTableNameRegex(captureTableNames), > slotName, > otherOptions.isEmpty() > ? "" > : "," > + otherOptions.entrySet().stream() > .map( > e -> > String.format( > "'%s'='%s'", > e.getKey(), > e.getValue())) > .collect(Collectors.joining(","))); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)