This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c9e2751 [Improve] registerDriver improvement (#219)
c9e2751 is described below
commit c9e2751b4767cf8837ca67b58f3395a2a294ecd9
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 30 09:57:25 2023 +0800
[Improve] registerDriver improvement (#219)
---
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 10 +++++++++
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 26 +++++++++++++++++-----
.../flink/tools/cdc/oracle/OracleDatabaseSync.java | 21 ++++++++++++++---
.../tools/cdc/postgres/PostgresDatabaseSync.java | 14 ++++++++++--
.../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 18 +++++++++++----
5 files changed, 74 insertions(+), 15 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 8aef65d..fcd0f4c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -50,26 +50,36 @@ public abstract class DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(DatabaseSync.class);
private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
private static final String TABLE_NAME_OPTIONS = "table-name";
+
protected Configuration config;
+
protected String database;
+
protected TableNameConverter converter;
protected Pattern includingPattern;
protected Pattern excludingPattern;
protected Map<String, String> tableConfig;
protected Configuration sinkConfig;
protected boolean ignoreDefaultValue;
+
public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
private boolean newSchemaChange;
protected String includingTables;
protected String excludingTables;
+ public abstract void registerDriver() throws SQLException;
+
public abstract Connection getConnection() throws SQLException;
public abstract List<SourceSchema> getSchemaList() throws Exception;
public abstract DataStreamSource<String>
buildCdcSource(StreamExecutionEnvironment env);
+ public DatabaseSync() throws SQLException {
+ registerDriver();
+ }
+
public void create(StreamExecutionEnvironment env, String database,
Configuration config,
String tablePrefix, String tableSuffix, String
includingTables,
String excludingTables, boolean ignoreDefaultValue,
Configuration sinkConfig,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 2235e0b..22e49aa 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -54,10 +54,25 @@ import java.util.regex.Pattern;
public class MysqlDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(MysqlDatabaseSync.class);
- private static String JDBC_URL =
"jdbc:mysql://%s:%d?useInformationSchema=true";
- private static String PROPERTIES_PREFIX = "jdbc.properties.";
+ private static final String JDBC_URL =
"jdbc:mysql://%s:%d?useInformationSchema=true";
+ private static final String PROPERTIES_PREFIX = "jdbc.properties.";
- public MysqlDatabaseSync() {
+ public MysqlDatabaseSync() throws SQLException {
+ super();
+ }
+
+ @Override
+ public void registerDriver() throws SQLException {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (ClassNotFoundException ex) {
+ LOG.warn("can not found class com.mysql.cj.jdbc.Driver, use class
com.mysql.jdbc.Driver");
+ try {
+ Class.forName("com.mysql.jdbc.Driver");
+ } catch (Exception e) {
+ throw new SQLException("No suitable driver found, can not
found class com.mysql.cj.jdbc.Driver and com.mysql.jdbc.Driver");
+ }
+ }
}
@Override
@@ -86,7 +101,7 @@ public class MysqlDatabaseSync extends DatabaseSync {
}
SourceSchema sourceSchema =
new MysqlSchema(metaData, databaseName, tableName,
tableComment);
- sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0
? DataModel.UNIQUE : DataModel.DUPLICATE);
+ sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty()
? DataModel.UNIQUE : DataModel.DUPLICATE);
schemaList.add(sourceSchema);
}
}
@@ -196,9 +211,8 @@ public class MysqlDatabaseSync extends DatabaseSync {
}
MySqlSource<String> mySqlSource =
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
- DataStreamSource<String> streamSource = env.fromSource(
+ return env.fromSource(
mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
- return streamSource;
}
/**
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 0049579..6e27eb6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -59,9 +59,24 @@ import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_
public class OracleDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(OracleDatabaseSync.class);
- private static String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s";
+ private static final String JDBC_URL = "jdbc:oracle:thin:@%s:%d:%s";
- public OracleDatabaseSync() {
+ public OracleDatabaseSync() throws SQLException {
+ super();
+ }
+
+ @Override
+ public void registerDriver() throws SQLException {
+ try {
+ Class.forName("oracle.jdbc.driver.OracleDriver");
+ } catch (ClassNotFoundException ex) {
+ LOG.warn("can not found class oracle.jdbc.driver.OracleDriver, use
class oracle.jdbc.OracleDriver");
+ try {
+ Class.forName("oracle.jdbc.OracleDriver");
+ } catch (Exception e) {
+ throw new SQLException("No suitable driver found, can not
found class oracle.jdbc.driver.OracleDriver and oracle.jdbc.OracleDriver");
+ }
+ }
}
@Override
@@ -97,7 +112,7 @@ public class OracleDatabaseSync extends DatabaseSync {
}
SourceSchema sourceSchema =
new OracleSchema(metaData, databaseName,
schemaName, tableName, tableComment);
- sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0
? DataModel.UNIQUE : DataModel.DUPLICATE);
+ sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty()
? DataModel.UNIQUE : DataModel.DUPLICATE);
schemaList.add(sourceSchema);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index 31878a5..b8c9ad1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -62,9 +62,19 @@ import static
com.ververica.cdc.connectors.postgres.source.config.PostgresSource
public class PostgresDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(PostgresDatabaseSync.class);
- private static String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
+ private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s";
- public PostgresDatabaseSync() {
+ public PostgresDatabaseSync() throws SQLException {
+ super();
+ }
+
+ @Override
+ public void registerDriver() throws SQLException {
+ try {
+ Class.forName("org.postgresql.Driver");
+ } catch (ClassNotFoundException ex) {
+ throw new SQLException("No suitable driver found, can not found
class org.postgresql.Driver");
+ }
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index 6cf9c9d..fb25212 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -58,10 +58,20 @@ import static
com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_
public class SqlServerDatabaseSync extends DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(SqlServerDatabaseSync.class);
- private static String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s";
- private static String PORT = "port";
+ private static final String JDBC_URL =
"jdbc:sqlserver://%s:%d;database=%s";
+ private static final String PORT = "port";
- public SqlServerDatabaseSync() {
+ public SqlServerDatabaseSync() throws SQLException {
+ super();
+ }
+
+ @Override
+ public void registerDriver() throws SQLException {
+ try {
+ Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+ } catch (ClassNotFoundException ex) {
+ throw new SQLException("No suitable driver found, can not found
class com.microsoft.sqlserver.jdbc.SQLServerDriver");
+ }
}
@Override
@@ -91,7 +101,7 @@ public class SqlServerDatabaseSync extends DatabaseSync {
}
SourceSchema sourceSchema =
new SqlServerSchema(metaData, databaseName, null,
tableName, tableComment);
- sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0
? DataModel.UNIQUE : DataModel.DUPLICATE);
+ sourceSchema.setModel(!sourceSchema.primaryKeys.isEmpty()
? DataModel.UNIQUE : DataModel.DUPLICATE);
schemaList.add(sourceSchema);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]