[jira] [Commented] (FLINK-34869) [Bug][mysql] Remove all previous table and add new added table will throw Exception.

2024-03-20 Thread Flink CDC Issue Import (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828861#comment-17828861
 ] 

Flink CDC Issue Import commented on FLINK-34869:


Date: Wed Jan 31 17:17:11 CST 2024, Author: 
[loserwang1024|https://github.com/loserwang1024]

@leonardBang , @ruanhang1993 , CC

> [Bug][mysql] Remove all previous table and add new added table will throw 
> Exception.
> 
>
> Key: FLINK-34869
> URL: https://issues.apache.org/jira/browse/FLINK-34869
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Flink CDC Issue Import
>Priority: Major
>  Labels: github-import
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
> nothing similar.
> ### Flink version
> 1.18
> ### Flink CDC version
> 3.0.1
> ### Database and its version
> anyone 
> ### Minimal reproduce step
> 1. Stop job in savepoint.
> 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList 
> with tables which not includes in last time.
> 3. Then assign status will be chaos.
> Take a test case for example:
> ```java
> public class NewlyAddedTableITCase extends MySqlSourceTestBase {
> @Test
> public void testRemoveAndAddTablesOneByOne() throws Exception {
> testRemoveAndAddTablesOneByOne(
> 1, "address_hangzhou", "address_beijing", "address_shanghai");
> }
> private void testRemoveAndAddTablesOneByOne(int parallelism, String... 
> captureAddressTables)
> throws Exception {
> MySqlConnection connection = getConnection();
> // step 1: create mysql tables with all tables included
> initialAddressTables(connection, captureAddressTables);
> final TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> final String savepointDirectory = 
> temporaryFolder.newFolder().toURI().toString();
> // get all expected data
> List fetchedDataList = new ArrayList<>();
> String finishedSavePointPath = null;
> // test removing and adding table one by one
> for (int round = 0; round < captureAddressTables.length; round++] {
> String captureTableThisRound = captureAddressTables[round];
> String cityName = captureTableThisRound.split("_")[1];
> StreamExecutionEnvironment env =
> getStreamExecutionEnvironment(finishedSavePointPath, 
> parallelism);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> String createTableStatement =
> getCreateTableStatement(new HashMap<>(), 
> captureTableThisRound);
> tEnv.executeSql(createTableStatement);
> tEnv.executeSql(
> "CREATE TABLE sink ("
> + " table_name STRING,"
> + " id BIGINT,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (table_name,id) not enforced"
> + ") WITH ("
> + " 'connector' = 'values',"
> + " 'sink-insert-only' = 'false'"
> + ")");
> TableResult tableResult = tEnv.executeSql("insert into sink 
> select * from address");
> JobClient jobClient = tableResult.getJobClient().get();
> // this round's snapshot data
> fetchedDataList.addAll(
> Arrays.asList(
> format(
> "+I[%s, 416874195632735147, China, %s, %s 
> West Town address 1]",
> captureTableThisRound, cityName, 
> cityName),
> format(
> "+I[%s, 416927583791428523, China, %s, %s 
> West Town address 2]",
> captureTableThisRound, cityName, 
> cityName),
> format(
> "+I[%s, 417022095255614379, China, %s, %s 
> West Town address 3]",
> captureTableThisRound, cityName, 
> cityName)));
> waitForSinkSize("sink", fetchedDataList.size());
> assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
> // only this round table's data is captured.
> // step 3: make binlog data for all tables before this round(also 
> includes this round)
> for (int i = 0; i <= round; i

[jira] [Commented] (FLINK-34869) [Bug][mysql] Remove all previous table and add new added table will throw Exception.

2024-04-23 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840201#comment-17840201
 ] 

Josh Mahonin commented on FLINK-34869:
--

We seem to be hitting this issue as well. Is this issue possibly fixed by 
another commit, or is it still outstanding [~loserwang1024] ?

If the issue is still outstanding, with some guidance we could attempt a patch. 
Thanks.

> [Bug][mysql] Remove all previous table and add new added table will throw 
> Exception.
> 
>
> Key: FLINK-34869
> URL: https://issues.apache.org/jira/browse/FLINK-34869
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Flink CDC Issue Import
>Priority: Major
>  Labels: github-import
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
> nothing similar.
> ### Flink version
> 1.18
> ### Flink CDC version
> 3.0.1
> ### Database and its version
> anyone 
> ### Minimal reproduce step
> 1. Stop job in savepoint.
> 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList 
> with tables which not includes in last time.
> 3. Then assign status will be chaos.
> Take a test case for example:
> ```java
> public class NewlyAddedTableITCase extends MySqlSourceTestBase {
> @Test
> public void testRemoveAndAddTablesOneByOne() throws Exception {
> testRemoveAndAddTablesOneByOne(
> 1, "address_hangzhou", "address_beijing", "address_shanghai");
> }
> private void testRemoveAndAddTablesOneByOne(int parallelism, String... 
> captureAddressTables)
> throws Exception {
> MySqlConnection connection = getConnection();
> // step 1: create mysql tables with all tables included
> initialAddressTables(connection, captureAddressTables);
> final TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> final String savepointDirectory = 
> temporaryFolder.newFolder().toURI().toString();
> // get all expected data
> List fetchedDataList = new ArrayList<>();
> String finishedSavePointPath = null;
> // test removing and adding table one by one
> for (int round = 0; round < captureAddressTables.length; round++] {
> String captureTableThisRound = captureAddressTables[round];
> String cityName = captureTableThisRound.split("_")[1];
> StreamExecutionEnvironment env =
> getStreamExecutionEnvironment(finishedSavePointPath, 
> parallelism);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> String createTableStatement =
> getCreateTableStatement(new HashMap<>(), 
> captureTableThisRound);
> tEnv.executeSql(createTableStatement);
> tEnv.executeSql(
> "CREATE TABLE sink ("
> + " table_name STRING,"
> + " id BIGINT,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (table_name,id) not enforced"
> + ") WITH ("
> + " 'connector' = 'values',"
> + " 'sink-insert-only' = 'false'"
> + ")");
> TableResult tableResult = tEnv.executeSql("insert into sink 
> select * from address");
> JobClient jobClient = tableResult.getJobClient().get();
> // this round's snapshot data
> fetchedDataList.addAll(
> Arrays.asList(
> format(
> "+I[%s, 416874195632735147, China, %s, %s 
> West Town address 1]",
> captureTableThisRound, cityName, 
> cityName),
> format(
> "+I[%s, 416927583791428523, China, %s, %s 
> West Town address 2]",
> captureTableThisRound, cityName, 
> cityName),
> format(
> "+I[%s, 417022095255614379, China, %s, %s 
> West Town address 3]",
> captureTableThisRound, cityName, 
> cityName)));
> waitForSinkSize("sink", fetchedDataList.size());
> assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
> // only this round table's data is captured.
> // step 3: make binlog data for all tables before this 

[jira] [Commented] (FLINK-34869) [Bug][mysql] Remove all previous table and add new added table will throw Exception.

2024-04-23 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840280#comment-17840280
 ] 

Hongshun Wang commented on FLINK-34869:
---

It seems I haven't fixed it [~jmahonin] 

> [Bug][mysql] Remove all previous table and add new added table will throw 
> Exception.
> 
>
> Key: FLINK-34869
> URL: https://issues.apache.org/jira/browse/FLINK-34869
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Flink CDC Issue Import
>Assignee: Josh Mahonin
>Priority: Major
>  Labels: github-import
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
> nothing similar.
> ### Flink version
> 1.18
> ### Flink CDC version
> 3.0.1
> ### Database and its version
> anyone 
> ### Minimal reproduce step
> 1. Stop job in savepoint.
> 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList 
> with tables which not includes in last time.
> 3. Then assign status will be chaos.
> Take a test case for example:
> ```java
> public class NewlyAddedTableITCase extends MySqlSourceTestBase {
> @Test
> public void testRemoveAndAddTablesOneByOne() throws Exception {
> testRemoveAndAddTablesOneByOne(
> 1, "address_hangzhou", "address_beijing", "address_shanghai");
> }
> private void testRemoveAndAddTablesOneByOne(int parallelism, String... 
> captureAddressTables)
> throws Exception {
> MySqlConnection connection = getConnection();
> // step 1: create mysql tables with all tables included
> initialAddressTables(connection, captureAddressTables);
> final TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> final String savepointDirectory = 
> temporaryFolder.newFolder().toURI().toString();
> // get all expected data
> List fetchedDataList = new ArrayList<>();
> String finishedSavePointPath = null;
> // test removing and adding table one by one
> for (int round = 0; round < captureAddressTables.length; round++] {
> String captureTableThisRound = captureAddressTables[round];
> String cityName = captureTableThisRound.split("_")[1];
> StreamExecutionEnvironment env =
> getStreamExecutionEnvironment(finishedSavePointPath, 
> parallelism);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> String createTableStatement =
> getCreateTableStatement(new HashMap<>(), 
> captureTableThisRound);
> tEnv.executeSql(createTableStatement);
> tEnv.executeSql(
> "CREATE TABLE sink ("
> + " table_name STRING,"
> + " id BIGINT,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (table_name,id) not enforced"
> + ") WITH ("
> + " 'connector' = 'values',"
> + " 'sink-insert-only' = 'false'"
> + ")");
> TableResult tableResult = tEnv.executeSql("insert into sink 
> select * from address");
> JobClient jobClient = tableResult.getJobClient().get();
> // this round's snapshot data
> fetchedDataList.addAll(
> Arrays.asList(
> format(
> "+I[%s, 416874195632735147, China, %s, %s 
> West Town address 1]",
> captureTableThisRound, cityName, 
> cityName),
> format(
> "+I[%s, 416927583791428523, China, %s, %s 
> West Town address 2]",
> captureTableThisRound, cityName, 
> cityName),
> format(
> "+I[%s, 417022095255614379, China, %s, %s 
> West Town address 3]",
> captureTableThisRound, cityName, 
> cityName)));
> waitForSinkSize("sink", fetchedDataList.size());
> assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
> // only this round table's data is captured.
> // step 3: make binlog data for all tables before this round(also 
> includes this round)
> for (int i = 0; i <= round; i++) {
> String tableName = captureAddressTables[i];
>