This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5c6a7c6984 [Fixbug] doris custom sql work (#7464)
5c6a7c6984 is described below

commit 5c6a7c6984e61b8078e202f1b8a9276a0fdee0dd
Author: Jast <[email protected]>
AuthorDate: Sat Aug 24 16:13:58 2024 +0800

    [Fixbug] doris custom sql work (#7464)
---
 .../connectors/doris/catalog/DorisCatalog.java     |  9 +++
 .../seatunnel/e2e/connector/doris/DorisIT.java     | 24 ++++++++
 .../doris_source_and_sink_with_custom_sql.conf     | 72 ++++++++++++++++++++++
 3 files changed, 105 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index 146d364652..a7f5eabf63 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -494,6 +494,15 @@ public class DorisCatalog implements Catalog {
         }
     }
 
+    @Override
+    public void executeSql(TablePath tablePath, String sql) {
+        try (PreparedStatement ps = conn.prepareStatement(sql)) {
+            ps.execute();
+        } catch (SQLException e) {
+            throw new CatalogException(String.format("Failed executeSql error 
%s", sql), e);
+        }
+    }
+
     @Override
     public PreviewResult previewAction(
             ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
index e9b81100de..e57349bbbe 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
@@ -173,6 +173,16 @@ public class DorisIT extends AbstractDorisIT {
                 Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
             };
 
+    @TestTemplate
+    public void testCustomSql(TestContainer container) throws IOException, 
InterruptedException {
+        initializeJdbcTable();
+        Container.ExecResult execResult =
+                
container.executeJob("/doris_source_and_sink_with_custom_sql.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(101, tableCount(sinkDB, UNIQUE_TABLE));
+        clearUniqueTable();
+    }
+
     @TestTemplate
     public void testDoris(TestContainer container) throws IOException, 
InterruptedException {
         initializeJdbcTable();
@@ -344,6 +354,20 @@ public class DorisIT extends AbstractDorisIT {
         Assertions.assertEquals(sourceResultSet.getRow(), 
sinkResultSet.getRow());
     }
 
+    private Integer tableCount(String db, String table) {
+        try (Statement statement = conn.createStatement()) {
+            String sql = String.format("select count(*) from %s.%s", db, 
table);
+            ResultSet source = statement.executeQuery(sql);
+            if (source.next()) {
+                int rowCount = source.getInt(1);
+                return rowCount;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check data in Doris server", 
e);
+        }
+        return -1;
+    }
+
     private void assertHasData(String db, String table) {
         try (Statement statement = conn.createStatement()) {
             String sql = String.format("select * from %s.%s limit 1", db, 
table);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_custom_sql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_custom_sql.conf
new file mode 100644
index 0000000000..dc90dfa343
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_custom_sql.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env{
+  parallelism = 1
+  job.mode = "BATCH"
+  job.retry.times = 0
+}
+
+source{
+  FakeSource {
+    row.num = 100
+    split.num = 10
+    string.length = 1
+    schema = {
+      fields {
+        F_ID = "bigint"
+        F_INT = "int"
+        F_BIGINT = "bigint"
+        F_TINYINT = "tinyint"
+        F_SMALLINT = "smallint"
+        F_DECIMAL = "decimal(10,2)"
+        F_LARGEINT = "bigint"
+        F_BOOLEAN = "boolean"
+        F_DOUBLE = "double"
+        F_FLOAT = "float"
+        F_CHAR = "string"
+        F_VARCHAR_11 = "string"
+        F_STRING = "string"
+        F_DATETIME_P = "timestamp"
+        F_DATETIME = "timestamp"
+        F_DATE = "date"
+      }
+    }
+  }
+}
+
+transform {}
+
+sink{
+  Doris {
+          fenodes = "doris_e2e:8030"
+          username = root
+          password = ""
+          table.identifier = "e2e_sink.doris_e2e_unique_table"
+          data_save_mode=CUSTOM_PROCESSING
+          custom_sql="INSERT INTO  e2e_sink.doris_e2e_unique_table ( 
F_ID,F_INT,F_BIGINT) VALUES (1, 123,   1234567890123);"
+          sink.enable-2pc = "true"
+          sink.buffer-size = 2
+          sink.buffer-count = 2
+          sink.label-prefix = "test_json"
+          doris.config = {
+              format="json"
+              read_json_by_line="true"
+          }
+          save_mode_create_template = """CREATE TABLE IF NOT EXISTS 
`${database}`.`${table}` (${rowtype_fields}) ENGINE=OLAP unique KEY (`F_ID`) 
DISTRIBUTED BY HASH (`F_ID`) PROPERTIES ("replication_allocation" = 
"tag.location.default: 1")"""
+      }
+  }
\ No newline at end of file

Reply via email to