This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5c6a7c6984 [Fixbug] doris custom sql work (#7464)
5c6a7c6984 is described below
commit 5c6a7c6984e61b8078e202f1b8a9276a0fdee0dd
Author: Jast <[email protected]>
AuthorDate: Sat Aug 24 16:13:58 2024 +0800
[Fixbug] doris custom sql work (#7464)
---
.../connectors/doris/catalog/DorisCatalog.java | 9 +++
.../seatunnel/e2e/connector/doris/DorisIT.java | 24 ++++++++
.../doris_source_and_sink_with_custom_sql.conf | 72 ++++++++++++++++++++++
3 files changed, 105 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index 146d364652..a7f5eabf63 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -494,6 +494,15 @@ public class DorisCatalog implements Catalog {
}
}
+ @Override
+ public void executeSql(TablePath tablePath, String sql) {
+ try (PreparedStatement ps = conn.prepareStatement(sql)) {
+ ps.execute();
+ } catch (SQLException e) {
+ throw new CatalogException(String.format("Failed executeSql error
%s", sql), e);
+ }
+ }
+
@Override
public PreviewResult previewAction(
ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
index e9b81100de..e57349bbbe 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
@@ -173,6 +173,16 @@ public class DorisIT extends AbstractDorisIT {
Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
};
+ @TestTemplate
+ public void testCustomSql(TestContainer container) throws IOException,
InterruptedException {
+ initializeJdbcTable();
+ Container.ExecResult execResult =
+
container.executeJob("/doris_source_and_sink_with_custom_sql.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(101, tableCount(sinkDB, UNIQUE_TABLE));
+ clearUniqueTable();
+ }
+
@TestTemplate
public void testDoris(TestContainer container) throws IOException,
InterruptedException {
initializeJdbcTable();
@@ -344,6 +354,20 @@ public class DorisIT extends AbstractDorisIT {
Assertions.assertEquals(sourceResultSet.getRow(),
sinkResultSet.getRow());
}
+ private Integer tableCount(String db, String table) {
+ try (Statement statement = conn.createStatement()) {
+ String sql = String.format("select count(*) from %s.%s", db,
table);
+ ResultSet source = statement.executeQuery(sql);
+ if (source.next()) {
+ int rowCount = source.getInt(1);
+ return rowCount;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check data in Doris server",
e);
+ }
+ return -1;
+ }
+
private void assertHasData(String db, String table) {
try (Statement statement = conn.createStatement()) {
String sql = String.format("select * from %s.%s limit 1", db,
table);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_custom_sql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_custom_sql.conf
new file mode 100644
index 0000000000..dc90dfa343
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_with_custom_sql.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env{
+ parallelism = 1
+ job.mode = "BATCH"
+ job.retry.times = 0
+}
+
+source{
+ FakeSource {
+ row.num = 100
+ split.num = 10
+ string.length = 1
+ schema = {
+ fields {
+ F_ID = "bigint"
+ F_INT = "int"
+ F_BIGINT = "bigint"
+ F_TINYINT = "tinyint"
+ F_SMALLINT = "smallint"
+ F_DECIMAL = "decimal(10,2)"
+ F_LARGEINT = "bigint"
+ F_BOOLEAN = "boolean"
+ F_DOUBLE = "double"
+ F_FLOAT = "float"
+ F_CHAR = "string"
+ F_VARCHAR_11 = "string"
+ F_STRING = "string"
+ F_DATETIME_P = "timestamp"
+ F_DATETIME = "timestamp"
+ F_DATE = "date"
+ }
+ }
+ }
+}
+
+transform {}
+
+sink{
+ Doris {
+ fenodes = "doris_e2e:8030"
+ username = root
+ password = ""
+ table.identifier = "e2e_sink.doris_e2e_unique_table"
+ data_save_mode=CUSTOM_PROCESSING
+ custom_sql="INSERT INTO e2e_sink.doris_e2e_unique_table (
F_ID,F_INT,F_BIGINT) VALUES (1, 123, 1234567890123);"
+ sink.enable-2pc = "true"
+ sink.buffer-size = 2
+ sink.buffer-count = 2
+ sink.label-prefix = "test_json"
+ doris.config = {
+ format="json"
+ read_json_by_line="true"
+ }
+ save_mode_create_template = """CREATE TABLE IF NOT EXISTS
`${database}`.`${table}` (${rowtype_fields}) ENGINE=OLAP unique KEY (`F_ID`)
DISTRIBUTED BY HASH (`F_ID`) PROPERTIES ("replication_allocation" =
"tag.location.default: 1")"""
+ }
+ }
\ No newline at end of file