This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new f8ea59b [mysql] add mysql chunk column options (#188)
f8ea59b is described below
commit f8ea59ba79419018dd495bb107410718d708b61d
Author: wudi <[email protected]>
AuthorDate: Thu Aug 31 10:14:36 2023 +0800
[mysql] add mysql chunk column options (#188)
1、add mysql column key column:
`--mysql-conf scan.incremental.snapshot.chunk.key-column =
db.tbl1:col1,db.tbl2:col2`
mysqlcdc version must be greater than 2.4
2、fix sqlserver type
---------
Co-authored-by: wudi <>
---
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 3 +-
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 51 +++++++++++++++++-----
.../flink/tools/cdc/sqlserver/SqlServerType.java | 7 ++-
3 files changed, 45 insertions(+), 16 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 29c554e..bc15987 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -94,6 +94,7 @@ public abstract class DatabaseSync {
DorisSystem dorisSystem = new DorisSystem(options);
List<SourceSchema> schemaList = getSchemaList();
+ Preconditions.checkState(!schemaList.isEmpty(), "No tables to be
synchronized.");
if (!dorisSystem.databaseExists(database)) {
LOG.info("database {} not exist, created", database);
dorisSystem.createDatabase(database);
@@ -118,9 +119,7 @@ public abstract class DatabaseSync {
System.exit(0);
}
- Preconditions.checkState(!syncTables.isEmpty(), "No tables to be
synchronized.");
config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|",
syncTables) + ")");
-
DataStreamSource<String> streamSource = buildCdcSource(env);
SingleOutputStreamOperator<Void> parsedStream =
streamSource.process(new ParsingProcessFunction(converter));
for (String table : dorisTables) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 735fdb9..05dd298 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -26,7 +26,7 @@ import
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonCon
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
-
+import org.apache.doris.flink.catalog.doris.DataModel;
import
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.DateToStringConverter;
@@ -34,7 +34,9 @@ import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +50,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class MysqlDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(MysqlDatabaseSync.class);
@@ -83,13 +87,8 @@ public class MysqlDatabaseSync extends DatabaseSync {
}
SourceSchema sourceSchema =
new MysqlSchema(metaData, databaseName, tableName,
tableComment);
- if (sourceSchema.primaryKeys.size() > 0) {
- //Only sync tables with primary keys
- schemaList.add(sourceSchema);
- } else {
- LOG.warn("table {} has no primary key, skip",
tableName);
- System.out.println("table " + tableName + " has no
primary key, skip.");
- }
+ sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0
? DataModel.UNIQUE : DataModel.DUPLICATE);
+ schemaList.add(sourceSchema);
}
}
}
@@ -136,13 +135,11 @@ public class MysqlDatabaseSync extends DatabaseSync {
config
.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
.ifPresent(sourceBuilder::splitSize);
-
- //Compatible with flink cdc mysql 2.3.0, close this option first
- /* config
+ config
.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED)
.ifPresent(sourceBuilder::closeIdleReaders);
- **/
+ setChunkColumns(sourceBuilder);
String startupMode = config.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
if ("initial".equalsIgnoreCase(startupMode)) {
sourceBuilder.startupOptions(StartupOptions.initial());
@@ -205,6 +202,36 @@ public class MysqlDatabaseSync extends DatabaseSync {
return streamSource;
}
+ /**
+ * set chunkkeyColumn,eg: db.table1:column1,db.table2:column2
+ * @param sourceBuilder
+ */
+ private void setChunkColumns(MySqlSourceBuilder<String> sourceBuilder) {
+ Map<ObjectPath, String> chunkColumnMap = getChunkColumnMap();
+ for(Map.Entry<ObjectPath, String> entry : chunkColumnMap.entrySet()){
+ sourceBuilder.chunkKeyColumn(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private Map<ObjectPath, String> getChunkColumnMap(){
+ Map<ObjectPath, String> chunkMap = new HashMap<>();
+ String chunkColumn =
config.getString(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
+ if(!StringUtils.isNullOrWhitespaceOnly(chunkColumn)){
+ final Pattern chunkPattern =
Pattern.compile("(\\S+)\\.(\\S+):(\\S+)");
+ String[] tblColumns = chunkColumn.split(",");
+ for(String tblCol : tblColumns){
+ Matcher matcher = chunkPattern.matcher(tblCol);
+ if(matcher.find()){
+ String db = matcher.group(1);
+ String table = matcher.group(2);
+ String col = matcher.group(3);
+ chunkMap.put(new ObjectPath(db, table), col);
+ }
+ }
+ }
+ return chunkMap;
+ }
+
private Properties getJdbcProperties(){
Properties jdbcProps = new Properties();
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
index f09bd76..f2895c6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
@@ -47,8 +47,11 @@ public class SqlServerType {
private static final String BINARY = "binary";
private static final String VARBINARY = "varbinary";
- public static String toDorisType(String sqlServerType, Integer precision,
Integer scale) {
- sqlServerType = sqlServerType.toLowerCase();
+ public static String toDorisType(String originSqlServerType, Integer
precision, Integer scale) {
+ originSqlServerType = originSqlServerType.toLowerCase();
+ // For sqlserver IDENTITY type, such as 'INT IDENTITY'
+ // originSqlServerType is "int identity", so we only get "int".
+ String sqlServerType = originSqlServerType.split(" ")[0];
switch (sqlServerType){
case BIT:
return DorisType.BOOLEAN;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]