This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 10232359 [Fix] fix multi database sync npe error (#534)
10232359 is described below
commit 1023235936c4df6d49a959ccff3f5261582c8c85
Author: wudi <[email protected]>
AuthorDate: Fri Jan 3 15:21:38 2025 +0800
[Fix] fix multi database sync npe error (#534)
---
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 6 +-
.../flink/tools/cdc/ParsingProcessFunction.java | 28 +++-
.../tools/cdc/mongodb/MongoDBDatabaseSync.java | 2 +-
.../cdc/mongodb/MongoParsingProcessFunction.java | 4 +-
.../doris/flink/container/ContainerUtils.java | 19 ++-
.../flink/container/e2e/Mysql2DorisE2ECase.java | 144 ++++++++++++++++++---
.../mongodb/MongoParsingProcessFunctionTest.java | 2 +-
.../container/e2e/mysql2doris/testAutoAddTable.txt | 2 +
.../e2e/mysql2doris/testAutoAddTable_init.sql | 1 +
.../container/e2e/mysql2doris/testMySQL2Doris.txt | 2 +
.../e2e/mysql2doris/testMySQL2DorisByDefault.txt | 2 +
.../mysql2doris/testMySQL2DorisByDefault_init.sql | 1 +
.../e2e/mysql2doris/testMySQL2DorisCreateTable.txt | 2 +
.../testMySQL2DorisCreateTable_init.sql | 1 +
.../mysql2doris/testMySQL2DorisEnableDelete.txt | 2 +
.../testMySQL2DorisEnableDelete_init.sql | 1 +
.../e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt | 7 +
.../testMySQL2DorisMultiDb2One_init.sql | 70 ++++++++++
.../e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt | 4 +
.../testMySQL2DorisMultiDbSync_init.sql | 51 ++++++++
.../e2e/mysql2doris/testMySQL2DorisSQLParse.txt | 2 +
.../mysql2doris/testMySQL2DorisSQLParse_init.sql | 1 +
.../e2e/mysql2doris/testMySQL2Doris_init.sql | 1 +
23 files changed, 325 insertions(+), 30 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 0c1b860a..d6c69c0b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -170,7 +170,7 @@ public abstract class DatabaseSync {
streamSource.process(buildProcessFunction());
for (Tuple2<String, String> dbTbl : dorisTables) {
OutputTag<String> recordOutputTag =
- ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
+ ParsingProcessFunction.createRecordOutputTag(dbTbl.f0,
dbTbl.f1);
DataStream<String> sideOutput =
parsedStream.getSideOutput(recordOutputTag);
int sinkParallel =
sinkConfig.getInteger(
@@ -230,7 +230,7 @@ public abstract class DatabaseSync {
}
public ParsingProcessFunction buildProcessFunction() {
- return new ParsingProcessFunction(converter);
+ return new ParsingProcessFunction(database, converter);
}
/** create doris sink. */
@@ -479,7 +479,7 @@ public abstract class DatabaseSync {
}
TableSchema dorisSchema =
DorisSchemaFactory.createTableSchema(
- database,
+ targetDb,
dorisTable,
schema.getFields(),
schema.getPrimaryKeys(),
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 787d0ae1..22e2b9bc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -21,7 +21,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.StringUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,8 +34,10 @@ public class ParsingProcessFunction extends
ProcessFunction<String, Void> {
protected ObjectMapper objectMapper = new ObjectMapper();
private transient Map<String, OutputTag<String>> recordOutputTags;
private DatabaseSync.TableNameConverter converter;
+ private String database;
- public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) {
+ public ParsingProcessFunction(String database,
DatabaseSync.TableNameConverter converter) {
+ this.database = database;
this.converter = converter;
}
@@ -47,8 +51,17 @@ public class ParsingProcessFunction extends
ProcessFunction<String, Void> {
String record, ProcessFunction<String, Void>.Context context,
Collector<Void> collector)
throws Exception {
String tableName = getRecordTableName(record);
- String dorisName = converter.convert(tableName);
- context.output(getRecordOutputTag(dorisName), record);
+ String dorisTableName = converter.convert(tableName);
+ String dorisDbName = database;
+ if (StringUtils.isNullOrWhitespaceOnly(database)) {
+ dorisDbName = getRecordDatabaseName(record);
+ }
+ context.output(getRecordOutputTag(dorisDbName, dorisTableName),
record);
+ }
+
+ private String getRecordDatabaseName(String record) throws
JsonProcessingException {
+ JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+ return extractJsonNode(recordRoot.get("source"), "db");
}
protected String getRecordTableName(String record) throws Exception {
@@ -60,12 +73,13 @@ public class ParsingProcessFunction extends
ProcessFunction<String, Void> {
return record != null && record.get(key) != null ?
record.get(key).asText() : null;
}
- private OutputTag<String> getRecordOutputTag(String tableName) {
+ private OutputTag<String> getRecordOutputTag(String databaseName, String
tableName) {
+ String tableIdentifier = databaseName + "." + tableName;
return recordOutputTags.computeIfAbsent(
- tableName, ParsingProcessFunction::createRecordOutputTag);
+ tableIdentifier, k -> createRecordOutputTag(databaseName,
tableName));
}
- public static OutputTag<String> createRecordOutputTag(String tableName) {
- return new OutputTag<String>("record-" + tableName) {};
+ public static OutputTag<String> createRecordOutputTag(String databaseName,
String tableName) {
+ return new OutputTag<String>(String.format("record-%s-%s",
databaseName, tableName)) {};
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index 79c261d0..3526c075 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -198,7 +198,7 @@ public class MongoDBDatabaseSync extends DatabaseSync {
@Override
public ParsingProcessFunction buildProcessFunction() {
- return new MongoParsingProcessFunction(converter);
+ return new MongoParsingProcessFunction(database, converter);
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
index 737617a0..72c61567 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
@@ -27,8 +27,8 @@ import org.slf4j.LoggerFactory;
public class MongoParsingProcessFunction extends ParsingProcessFunction {
private static final Logger LOG =
LoggerFactory.getLogger(MongoParsingProcessFunction.class);
- public MongoParsingProcessFunction(TableNameConverter converter) {
- super(converter);
+ public MongoParsingProcessFunction(String databaseName, TableNameConverter
converter) {
+ super(databaseName, converter);
}
@Override
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
index e4c99d5a..f87e498c 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
@@ -104,6 +104,16 @@ public class ContainerUtils {
List<String> expected,
String query,
int columnSize) {
+ checkResult(connection, logger, expected, query, columnSize, true);
+ }
+
+ public static void checkResult(
+ Connection connection,
+ Logger logger,
+ List<String> expected,
+ String query,
+ int columnSize,
+ boolean ordered) {
List<String> actual = new ArrayList<>();
try (Statement statement = connection.createStatement()) {
ResultSet sinkResultSet = statement.executeQuery(query);
@@ -131,6 +141,13 @@ public class ContainerUtils {
"checking test result. expected={}, actual={}",
String.join(",", expected),
String.join(",", actual));
- Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+ if (ordered) {
+ Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+ } else {
+ Assert.assertEquals(expected.size(), actual.size());
+ Assert.assertArrayEquals(
+ expected.stream().sorted().toArray(Object[]::new),
+ actual.stream().sorted().toArray(Object[]::new));
+ }
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
index fe715f62..cb7d83ad 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
@@ -36,6 +36,7 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
private static final Logger LOG =
LoggerFactory.getLogger(Mysql2DorisE2ECase.class);
private static final String DATABASE = "test_e2e_mysql";
private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT
EXISTS " + DATABASE;
+ private static final String DROP_DATABASE = "DROP DATABASE IF EXISTS " +
DATABASE;
private static final String MYSQL_CONF = "--" +
DatabaseSyncConfig.MYSQL_CONF;
@Before
@@ -56,13 +57,8 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
argList.add(MYSQL_CONF);
argList.add(PASSWORD + "=" + getMySQLPassword());
argList.add(MYSQL_CONF);
- argList.add(DATABASE_NAME + "=" + DATABASE);
- // argList.add(MYSQL_CONF);
- // argList.add("server-time-zone=UTC");
+ argList.add("server-time-zone=UTC");
- // set doris database
- argList.add(DORIS_DATABASE);
- argList.add(DATABASE);
setSinkConfDefaultConfig(argList);
return argList;
}
@@ -82,15 +78,7 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
private void initDorisEnvironment() {
LOG.info("Initializing Doris environment.");
- ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG,
CREATE_DATABASE);
- ContainerUtils.executeSQLStatement(
- getDorisQueryConnection(),
- LOG,
- "DROP TABLE IF EXISTS test_e2e_mysql.tbl1",
- "DROP TABLE IF EXISTS test_e2e_mysql.tbl2",
- "DROP TABLE IF EXISTS test_e2e_mysql.tbl3",
- "DROP TABLE IF EXISTS test_e2e_mysql.tbl4",
- "DROP TABLE IF EXISTS test_e2e_mysql.tbl5");
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG,
DROP_DATABASE);
}
private void initEnvironment(String jobName, String mysqlSourcePath) {
@@ -436,6 +424,132 @@ public class Mysql2DorisE2ECase extends
AbstractE2EService {
throw new RuntimeException("Table not exist " + table);
}
+ @Test
+ public void testMySQL2DorisMultiDatabaseSync() throws Exception {
+ String jobName = "testMySQL2DorisMultiDatabaseSync";
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS
test_e2e_mysql_db1");
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS
test_e2e_mysql_db2");
+ initEnvironment(jobName,
"container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql");
+ startMysql2DorisJob(jobName,
"container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt");
+
+ // wait 3 times checkpoint
+ Thread.sleep(30000);
+ LOG.info("Start to verify init result.");
+ List<String> initExpected1 = Arrays.asList("1,db1_tb1,18");
+ String sql1 = "SELECT * FROM test_e2e_mysql_db1.tbl1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
initExpected1, sql1, 3, false);
+
+ List<String> initExpected2 = Arrays.asList("1,db1_tb2,19");
+ String sql2 = "SELECT * FROM test_e2e_mysql_db1.tbl2";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
initExpected2, sql2, 3, false);
+
+ List<String> initExpected3 = Arrays.asList("1,db2_tb1,20");
+ String sql3 = "SELECT * FROM test_e2e_mysql_db2.tbl1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
initExpected3, sql3, 3, false);
+
+ List<String> initExpected4 = Arrays.asList("1,db2_tb2,21");
+ String sql4 = "SELECT * FROM test_e2e_mysql_db2.tbl2";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
initExpected4, sql4, 3, false);
+
+ List<String> initExpected5 = Arrays.asList("1,db2_tb3,22");
+ String sql5 = "SELECT * FROM test_e2e_mysql_db2.tbl3";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
initExpected5, sql5, 3, false);
+
+ // add incremental data
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql_db1.tbl1 values (2,'db1_tb1',180)",
+ "insert into test_e2e_mysql_db1.tbl2 values (2,'db1_tb2',190)",
+ "insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',200)",
+ "insert into test_e2e_mysql_db2.tbl2 values (2,'db2_tb2',210)",
+ "insert into test_e2e_mysql_db2.tbl3 values
(2,'db2_tb3',220)");
+
+ Thread.sleep(20000);
+ List<String> incrExpected1 = Arrays.asList("1,db1_tb1,18",
"2,db1_tb1,180");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
incrExpected1, sql1, 3, false);
+
+ List<String> incrExpected2 = Arrays.asList("1,db1_tb2,19",
"2,db1_tb2,190");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
incrExpected2, sql2, 3, false);
+
+ List<String> incrExpected3 = Arrays.asList("1,db2_tb1,20",
"2,db2_tb1,200");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
incrExpected3, sql3, 3, false);
+
+ List<String> incrExpected4 = Arrays.asList("1,db2_tb2,21",
"2,db2_tb2,210");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
incrExpected4, sql4, 3, false);
+
+ List<String> incrExpected5 = Arrays.asList("1,db2_tb3,22",
"2,db2_tb3,220");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
incrExpected5, sql5, 3, false);
+
+ cancelE2EJob(jobName);
+ }
+
+ /**
+ * Separate databases and tables to write to the same database and table
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMySQL2DorisMultiDatabase2OneSync() throws Exception {
+ String jobName = "testMySQL2DorisMultiDatabase2OneSync";
+ initEnvironment(jobName,
"container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql");
+ startMysql2DorisJob(jobName,
"container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt");
+
+ // wait 3 times checkpoint
+ Thread.sleep(30000);
+ LOG.info("Start to verify init result.");
+ List<String> initExpected = Arrays.asList("1,db1_tb1,18",
"2,db2_tb1,20");
+ String sql1 = "SELECT * FROM test_e2e_mysql.tbl1";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
initExpected, sql1, 3, false);
+
+ List<String> initExpected2 =
+ Arrays.asList(
+ "1,db1_tb2_1,19", "2,db1_tb2_2,191", "3,db2_tb2_2,21",
"4,db2_tbl2_2,211");
+ String sql2 = "SELECT * FROM test_e2e_mysql.tbl2_merge";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
initExpected2, sql2, 3, false);
+
+ List<String> initExpected3 = Arrays.asList("1,db2_tb3,22");
+ String sql3 = "SELECT * FROM test_e2e_mysql.tbl3";
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
initExpected3, sql3, 3, false);
+
+ // add incremental data
+ ContainerUtils.executeSQLStatement(
+ getMySQLQueryConnection(),
+ LOG,
+ "insert into test_e2e_mysql_db1.tbl1 values (3,'db1_tb1',180)",
+ "insert into test_e2e_mysql_db2.tbl1 values (4,'db2_tb1',200)",
+ "insert into test_e2e_mysql_db1.tbl2_1 values
(5,'db1_tb2_1',1901)",
+ "insert into test_e2e_mysql_db1.tbl2_2 values
(6,'db1_tb2_2',1902)",
+ "insert into test_e2e_mysql_db2.tbl2_1 values
(7,'db2_tb2_1',2101)",
+ "insert into test_e2e_mysql_db2.tbl2_2 values
(8,'db2_tb2_2',2102)",
+ "insert into test_e2e_mysql_db2.tbl3 values
(3,'db2_tb3',220)");
+
+ Thread.sleep(20000);
+
+ List<String> incrExpected =
+ Arrays.asList("1,db1_tb1,18", "2,db2_tb1,20", "3,db1_tb1,180",
"4,db2_tb1,200");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
incrExpected, sql1, 3, false);
+
+ List<String> incrExpected2 =
+ Arrays.asList(
+ "1,db1_tb2_1,19",
+ "2,db1_tb2_2,191",
+ "3,db2_tb2_2,21",
+ "4,db2_tbl2_2,211",
+ "5,db1_tb2_1,1901",
+ "6,db1_tb2_2,1902",
+ "7,db2_tb2_1,2101",
+ "8,db2_tb2_2,2102");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
incrExpected2, sql2, 3, false);
+
+ List<String> incrExpected3 = Arrays.asList("1,db2_tb3,22",
"3,db2_tb3,220");
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG,
incrExpected3, sql3, 3, false);
+
+ cancelE2EJob(jobName);
+ }
+
@After
public void close() {
try {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
index e0c09b0f..3fca7b83 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
@@ -28,7 +28,7 @@ public class MongoParsingProcessFunctionTest {
String record =
"{\"_id\":\"{\\\"_id\\\": {\\\"$oid\\\":
\\\"66583533791a67a6f8c5a339\\\"}}\",\"operationType\":\"insert\",\"fullDocument\":\"{\\\"_id\\\":
{\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}, \\\"key1\\\":
\\\"value1\\\"}\",\"source\":{\"ts_ms\":0,\"snapshot\":\"true\"},\"ts_ms\":1717065582062,\"ns\":{\"db\":\"test\",\"coll\":\"cdc_test\"},\"to\":null,\"documentKey\":\"{\\\"_id\\\":
{\\\"$oid\\\":
\\\"66583533791a67a6f8c5a339\\\"}}\",\"updateDescription\":null,\"clusterTime
[...]
MongoParsingProcessFunction mongoParsingProcessFunction =
- new MongoParsingProcessFunction(null);
+ new MongoParsingProcessFunction(null, null);
String recordTableName =
mongoParsingProcessFunction.getRecordTableName(record);
assertEquals("cdc_test", recordTableName);
}
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
index 88ec4541..0b9a6247 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
@@ -1,4 +1,6 @@
mysql-sync-database
+ --database test_e2e_mysql
+ --mysql-conf database-name=test_e2e_mysql
--including-tables "tbl.*|auto_add"
--table-conf replication_num=1
--single-sink true
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
index ec617f30..f1042491 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
CREATE TABLE test_e2e_mysql.tbl1 (
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
index 601d0831..d88b2088 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
@@ -1,4 +1,6 @@
mysql-sync-database
+ --database test_e2e_mysql
+ --mysql-conf database-name=test_e2e_mysql
--including-tables "tbl.*"
--table-conf replication_num=1
--single-sink true
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
index 6f69a75b..b8b2974e 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
@@ -1,3 +1,5 @@
mysql-sync-database
+ --database test_e2e_mysql
+ --mysql-conf database-name=test_e2e_mysql
--including-tables "tbl1|tbl2|tbl3|tbl5"
--table-conf replication_num=1
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
index ec617f30..f1042491 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
CREATE TABLE test_e2e_mysql.tbl1 (
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
index 053dc9ef..60511be2 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
@@ -1,4 +1,6 @@
mysql-sync-database
+ --database test_e2e_mysql
+ --mysql-conf database-name=test_e2e_mysql
--including-tables "create_tbl_.*"
--create-table-only
--table-conf
table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
index cc3c16a6..4b65306d 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_uniq;
CREATE TABLE test_e2e_mysql.create_tbl_uniq (
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
index 1048916c..4f71f00c 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
@@ -1,4 +1,6 @@
mysql-sync-database
+ --database test_e2e_mysql
+ --mysql-conf database-name=test_e2e_mysql
--including-tables "tbl1|tbl2|tbl3|tbl5"
--table-conf replication_num=1
--sink-conf sink.enable-delete=false
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
index ec617f30..f1042491 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
CREATE TABLE test_e2e_mysql.tbl1 (
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt
new file mode 100644
index 00000000..42e671e5
--- /dev/null
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt
@@ -0,0 +1,7 @@
+mysql-sync-database
+ --database test_e2e_mysql
+ --mysql-conf database-name=test_e2e_mysql_db.*
+ --including-tables ".*"
+ --multi-to-one-origin "tbl2.*"
+ --multi-to-one-target "tbl2_merge"
+ --table-conf replication_num=1
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql
new file mode 100644
index 00000000..81d659d1
--- /dev/null
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql
@@ -0,0 +1,70 @@
+-- tbl1
+DROP DATABASE if EXISTS test_e2e_mysql_db1;
+CREATE DATABASE if NOT EXISTS test_e2e_mysql_db1;
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl1;
+CREATE TABLE test_e2e_mysql_db1.tbl1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl1 values (1,'db1_tb1',18);
+
+DROP DATABASE if EXISTS test_e2e_mysql_db2;
+CREATE DATABASE if NOT EXISTS test_e2e_mysql_db2;
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl1;
+CREATE TABLE test_e2e_mysql_db2.tbl1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',20);
+
+-- tbl2_1 tbl2_2
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2_1;
+CREATE TABLE test_e2e_mysql_db1.tbl2_1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl2_1 values (1,'db1_tb2_1',19);
+
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2_2;
+CREATE TABLE test_e2e_mysql_db1.tbl2_2 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl2_2 values (2,'db1_tb2_2',191);
+
+-- db2
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2_1;
+CREATE TABLE test_e2e_mysql_db2.tbl2_1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl2_1 values (3,'db2_tb2_2',21);
+
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2_2;
+CREATE TABLE test_e2e_mysql_db2.tbl2_2 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl2_2 values (4,'db2_tbl2_2',211);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl3;
+CREATE TABLE test_e2e_mysql_db2.tbl3 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl3 values (1,'db2_tb3',22);
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt
new file mode 100644
index 00000000..c12902e3
--- /dev/null
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt
@@ -0,0 +1,4 @@
+mysql-sync-database
+ --mysql-conf database-name=test_e2e_mysql_db.*
+ --including-tables ".*"
+ --table-conf replication_num=1
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql
new file mode 100644
index 00000000..60b022c5
--- /dev/null
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql
@@ -0,0 +1,51 @@
+-- db1
+DROP DATABASE if EXISTS test_e2e_mysql_db1;
+CREATE DATABASE if NOT EXISTS test_e2e_mysql_db1;
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl1;
+CREATE TABLE test_e2e_mysql_db1.tbl1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl1 values (1,'db1_tb1',18);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2;
+CREATE TABLE test_e2e_mysql_db1.tbl2 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl2 values (1,'db1_tb2',19);
+
+-- db2
+DROP DATABASE if EXISTS test_e2e_mysql_db2;
+CREATE DATABASE if NOT EXISTS test_e2e_mysql_db2;
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl1;
+CREATE TABLE test_e2e_mysql_db2.tbl1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl1 values (1,'db2_tb1',20);
+
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2;
+CREATE TABLE test_e2e_mysql_db2.tbl2 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl2 values (1,'db2_tb2',21);
+
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl3;
+CREATE TABLE test_e2e_mysql_db2.tbl3 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl3 values (1,'db2_tb3',22);
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
index d863ecfa..6876afae 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
@@ -1,4 +1,6 @@
mysql-sync-database
+ --database test_e2e_mysql
+ --mysql-conf database-name=test_e2e_mysql
--including-tables "tbl.*|add_tbl"
--table-conf replication_num=1
--schema-change-mode sql_parser
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
index ec617f30..f1042491 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
CREATE TABLE test_e2e_mysql.tbl1 (
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
index ec617f30..f1042491 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
CREATE DATABASE if NOT EXISTS test_e2e_mysql;
DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
CREATE TABLE test_e2e_mysql.tbl1 (
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]