This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f321087  [fix][Connector] fix sql parse table name error (#1520)
f321087 is described below

commit f321087a46be918e225b44f14b9b7ccbf214f22b
Author: TrickyZerg <[email protected]>
AuthorDate: Wed Mar 23 12:12:58 2022 +0800

    [fix][Connector] fix sql parse table name error (#1520)
    
    * fix sql parse table name error
    
    * code improve: extract the logic of get table name
    
    * fix checkstyle
---
 .../apache/seatunnel/flink/source/JdbcSource.java  | 35 +++++++++++++++-------
 1 file changed, 25 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
index e028880..32b6cbc 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/source/JdbcSource.java
@@ -40,6 +40,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public class JdbcSource implements FlinkBatchSource {
     private int fetchSize = DEFAULT_FETCH_SIZE;
     private Set<String> fields;
 
-    private static final Pattern COMPILE = Pattern.compile("select (.+) from 
(.+).*");
+    private static final Pattern COMPILE = 
Pattern.compile("[\\s]*select[\\s]*(.*)from[\\s]*([\\S]+).*");
 
     private JdbcInputFormat jdbcInputFormat;
 
@@ -106,7 +107,26 @@ public class JdbcSource implements FlinkBatchSource {
         dbUrl = config.getString(URL);
         username = config.getString(USERNAME);
         String query = config.getString(QUERY);
-        Matcher matcher = COMPILE.matcher(query);
+        Tuple2<String, Set<String>> tableNameAndFields = 
getTableNameAndFields(COMPILE, query);
+        tableName = tableNameAndFields.f0;
+        fields = tableNameAndFields.f1;
+        if (config.hasPath(PASSWORD)) {
+            password = config.getString(PASSWORD);
+        }
+        if (config.hasPath(SOURCE_FETCH_SIZE)) {
+            fetchSize = config.getInt(SOURCE_FETCH_SIZE);
+        }
+
+        jdbcInputFormat = JdbcInputFormat.buildFlinkJdbcInputFormat()
+                
.setDrivername(driverName).setDBUrl(dbUrl).setUsername(username)
+                .setPassword(password).setQuery(query).setFetchSize(fetchSize)
+                .setRowTypeInfo(getRowTypeInfo()).finish();
+    }
+
+    private Tuple2<String, Set<String>> getTableNameAndFields(Pattern regex, 
String selectSql) {
+        Matcher matcher = regex.matcher(selectSql);
+        String tableName;
+        Set<String> fields = null;
         if (matcher.find()) {
             String var = matcher.group(1);
             tableName = matcher.group(2);
@@ -118,15 +138,10 @@ public class JdbcSource implements FlinkBatchSource {
                 }
                 fields = vars;
             }
+            return new Tuple2<>(tableName, fields);
+        } else {
+            throw new IllegalArgumentException("can't find tableName and 
fields in sql :" + selectSql);
         }
-        if (config.hasPath(PASSWORD)) {
-            password = config.getString(PASSWORD);
-        }
-        if (config.hasPath(SOURCE_FETCH_SIZE)) {
-            fetchSize = config.getInt(SOURCE_FETCH_SIZE);
-        }
-
-        jdbcInputFormat = 
JdbcInputFormat.buildFlinkJdbcInputFormat().setDrivername(driverName).setDBUrl(dbUrl).setUsername(username).setPassword(password).setQuery(query).setFetchSize(fetchSize).setRowTypeInfo(getRowTypeInfo()).finish();
     }
 
     private RowTypeInfo getRowTypeInfo() {

Reply via email to