This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f321087 [fix][Connector] fix sql parse table name error (#1520)
f321087 is described below
commit f321087a46be918e225b44f14b9b7ccbf214f22b
Author: TrickyZerg <[email protected]>
AuthorDate: Wed Mar 23 12:12:58 2022 +0800
[fix][Connector] fix sql parse table name error (#1520)
* fix sql parse table name error
* code improve: extract the logic of get table name
* fix checkstyle
---
.../apache/seatunnel/flink/source/JdbcSource.java | 35 +++++++++++++++-------
1 file changed, 25 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
index e028880..32b6cbc 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
@@ -40,6 +40,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public class JdbcSource implements FlinkBatchSource {
private int fetchSize = DEFAULT_FETCH_SIZE;
private Set<String> fields;
- private static final Pattern COMPILE = Pattern.compile("select (.+) from
(.+).*");
+ private static final Pattern COMPILE =
Pattern.compile("[\\s]*select[\\s]*(.*)from[\\s]*([\\S]+).*");
private JdbcInputFormat jdbcInputFormat;
@@ -106,7 +107,26 @@ public class JdbcSource implements FlinkBatchSource {
dbUrl = config.getString(URL);
username = config.getString(USERNAME);
String query = config.getString(QUERY);
- Matcher matcher = COMPILE.matcher(query);
+ Tuple2<String, Set<String>> tableNameAndFields =
getTableNameAndFields(COMPILE, query);
+ tableName = tableNameAndFields.f0;
+ fields = tableNameAndFields.f1;
+ if (config.hasPath(PASSWORD)) {
+ password = config.getString(PASSWORD);
+ }
+ if (config.hasPath(SOURCE_FETCH_SIZE)) {
+ fetchSize = config.getInt(SOURCE_FETCH_SIZE);
+ }
+
+ jdbcInputFormat = JdbcInputFormat.buildFlinkJdbcInputFormat()
+
.setDrivername(driverName).setDBUrl(dbUrl).setUsername(username)
+ .setPassword(password).setQuery(query).setFetchSize(fetchSize)
+ .setRowTypeInfo(getRowTypeInfo()).finish();
+ }
+
+ private Tuple2<String, Set<String>> getTableNameAndFields(Pattern regex,
String selectSql) {
+ Matcher matcher = regex.matcher(selectSql);
+ String tableName;
+ Set<String> fields = null;
if (matcher.find()) {
String var = matcher.group(1);
tableName = matcher.group(2);
@@ -118,15 +138,10 @@ public class JdbcSource implements FlinkBatchSource {
}
fields = vars;
}
+ return new Tuple2<>(tableName, fields);
+ } else {
+ throw new IllegalArgumentException("can't find tableName and
fields in sql :" + selectSql);
}
- if (config.hasPath(PASSWORD)) {
- password = config.getString(PASSWORD);
- }
- if (config.hasPath(SOURCE_FETCH_SIZE)) {
- fetchSize = config.getInt(SOURCE_FETCH_SIZE);
- }
-
- jdbcInputFormat =
JdbcInputFormat.buildFlinkJdbcInputFormat().setDrivername(driverName).setDBUrl(dbUrl).setUsername(username).setPassword(password).setQuery(query).setFetchSize(fetchSize).setRowTypeInfo(getRowTypeInfo()).finish();
}
private RowTypeInfo getRowTypeInfo() {