This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new da648ac [fix](oracle) add oracle regexp like compatible (#194)
da648ac is described below
commit da648ac4c1a53bb5769b1fc193e054f9f880a62a
Author: wudi <[email protected]>
AuthorDate: Thu Sep 14 15:53:59 2023 +0800
[fix](oracle) add oracle regexp like compatible (#194)
When include-tables has too many table names, and debezium incrementally
reads, it will be judged based on `regexp_like`. When the regular length
exceeds 512, an error will be reported, like `ORA-12733: regular expression too
long`
---
.../main/java/org/apache/doris/flink/lookup/ExecutionPool.java | 4 ++--
.../main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 4 ++++
.../apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java | 9 ++++++++-
3 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
index 9b930ff..94cb35c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
@@ -104,9 +104,9 @@ public class ExecutionPool implements Closeable {
public void close() throws IOException {
if (started.compareAndSet(true, false)) {
LOG.info("close executorService");
- actionWatcherExecutorService.shutdownNow();
+ actionWatcherExecutorService.shutdown();
+ workerExecutorService.shutdown();
workerStated.set(false);
- workerExecutorService.shutdownNow();
this.actionWatcherExecutorService = null;
this.workerExecutorService = null;
this.semaphore = null;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 455000e..e36590c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -61,6 +61,8 @@ public abstract class DatabaseSync {
public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
private boolean newSchemaChange;
+ protected String includingTables;
+ protected String excludingTables;
public abstract Connection getConnection() throws SQLException;
@@ -76,6 +78,8 @@ public abstract class DatabaseSync {
this.config = config;
this.database = database;
this.converter = new TableNameConverter(tablePrefix, tableSuffix);
+ this.includingTables = includingTables;
+ this.excludingTables = excludingTables;
this.includingPattern = includingTables == null ? null :
Pattern.compile(includingTables);
this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
this.ignoreDefaultValue = ignoreDefaultValue;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 3b4bc31..bfd974f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -107,11 +107,19 @@ public class OracleDatabaseSync extends DatabaseSync {
@Override
public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment
env) {
+ Properties debeziumProperties = new Properties();
String databaseName = config.get(OracleSourceOptions.DATABASE_NAME);
String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
Preconditions.checkNotNull(databaseName, "database-name in oracle is
required");
Preconditions.checkNotNull(schemaName, "schema-name in oracle is
required");
String tableName = config.get(OracleSourceOptions.TABLE_NAME);
+ //When debezium incrementally reads, it will be judged based on
regexp_like.
+ //When the regular length exceeds 512, an error will be reported, like
ORA-12733: regular expression too long
+ if(tableName.length() > 384){
+ //max database name length 128
+ tableName = StringUtils.isNullOrWhitespaceOnly(includingTables) ?
".*" : includingTables;
+ }
+
String url = config.get(OracleSourceOptions.URL);
String hostname = config.get(OracleSourceOptions.HOSTNAME);
Integer port = config.get(OracleSourceOptions.PORT);
@@ -127,7 +135,6 @@ public class OracleDatabaseSync extends DatabaseSync {
}
//debezium properties set
- Properties debeziumProperties = new Properties();
debeziumProperties.put("decimal.handling.mode", "string");
//date to string
debeziumProperties.putAll(OracleDateConverter.DEFAULT_PROPS);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]