This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0e4e2b123 [Feature][Connector-V1 & V2] Support unauthorized ClickHouse 
(#2393)
0e4e2b123 is described below

commit 0e4e2b1230c019c6a11ec927b6cd47af4b1cfa4e
Author: qianmoQ <[email protected]>
AuthorDate: Thu Aug 11 21:24:09 2022 +0800

    [Feature][Connector-V1 & V2] Support unauthorized ClickHouse (#2393)
    
    * [Feature][Connector-V1] Support unauthorized ClickHouse
    
    * [Feature][Connector-V1] Add license
    
    * [Feature][Connector-V2] Support unauthorized ClickHouse
---
 .../seatunnel/clickhouse/shard/ShardMetadata.java  | 14 ++++++
 .../clickhouse/sink/client/ClickhouseSink.java     | 53 +++++++++++++++++-----
 .../seatunnel/clickhouse/util/ClickhouseUtil.java  |  8 +++-
 .../flink/clickhouse/sink/ClickhouseBatchSink.java |  6 ++-
 .../clickhouse/sink/client/ClickhouseClient.java   |  6 ++-
 .../spark/clickhouse/sink/Clickhouse.scala         |  8 ++--
 .../seatunnel-spark-connector-v2-example/pom.xml   |  5 ++
 ...{SeaTunnelApiExample.java => ExampleUtils.java} |  8 ++--
 .../example/spark/v2/SeaTunnelApiExample.java      | 25 +---------
 .../spark/v2/SeaTunnelApiToClickHouseExample.java  | 30 ++++++++++++
 .../resources/examples/spark.batch.clickhouse.conf | 52 +++++++++++++++++++++
 .../seatunnel-spark-examples/pom.xml               | 10 ++++
 .../spark/LocalSparkToClickHouseExample.java}      | 37 ++++++++-------
 .../resources/examples/spark.batch.clickhouse.conf | 47 +++++++++++++++++++
 14 files changed, 242 insertions(+), 67 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
index 3c01922f1..c40344b2a 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java
@@ -51,6 +51,20 @@ public class ShardMetadata implements Serializable {
         this.password = password;
     }
 
+    public ShardMetadata(String shardKey,
+                         String shardKeyType,
+                         String database,
+                         String table,
+                         boolean splitMode,
+                         Shard defaultShard) {
+        this.shardKey = shardKey;
+        this.shardKeyType = shardKeyType;
+        this.database = database;
+        this.table = table;
+        this.splitMode = splitMode;
+        this.defaultShard = defaultShard;
+    }
+
     public String getShardKey() {
         return shardKey;
     }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 295547c74..c296ca03a 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -78,7 +78,14 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
     @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, 
DATABASE, TABLE, USERNAME, PASSWORD);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, 
DATABASE, TABLE);
+
+        boolean isCredential = config.hasPath(USERNAME) || 
config.hasPath(PASSWORD);
+
+        if (isCredential) {
+            result = CheckConfigUtil.checkAllExists(config, USERNAME, 
PASSWORD);
+        }
+
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
         }
@@ -89,8 +96,14 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
 
         config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
 
-        List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(config.getString(HOST),
-                config.getString(DATABASE), config.getString(USERNAME), 
config.getString(PASSWORD));
+        List<ClickHouseNode> nodes;
+        if (!isCredential) {
+            nodes = ClickhouseUtil.createNodes(config.getString(HOST), 
config.getString(DATABASE),
+                    null, null);
+        } else {
+            nodes = ClickhouseUtil.createNodes(config.getString(HOST),
+                    config.getString(DATABASE), config.getString(USERNAME), 
config.getString(PASSWORD));
+        }
 
         Properties clickhouseProperties = new Properties();
         if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) {
@@ -98,8 +111,11 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
                 clickhouseProperties.put(e.getKey(), 
String.valueOf(e.getValue().unwrapped()));
             });
         }
-        clickhouseProperties.put("user", config.getString(USERNAME));
-        clickhouseProperties.put("password", config.getString(PASSWORD));
+
+        if (isCredential) {
+            clickhouseProperties.put("user", config.getString(USERNAME));
+            clickhouseProperties.put("password", config.getString(PASSWORD));
+        }
 
         ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
         Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(config.getString(TABLE));
@@ -117,13 +133,26 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
                 shardKeyType = tableSchema.get(shardKey);
             }
         }
-        ShardMetadata metadata = new ShardMetadata(
-                shardKey,
-                shardKeyType,
-                config.getString(DATABASE),
-                config.getString(TABLE),
-                config.getBoolean(SPLIT_MODE),
-                new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), 
config.getString(PASSWORD));
+        ShardMetadata metadata;
+
+        if (isCredential) {
+            metadata = new ShardMetadata(
+                    shardKey,
+                    shardKeyType,
+                    config.getString(DATABASE),
+                    config.getString(TABLE),
+                    config.getBoolean(SPLIT_MODE),
+                    new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), 
config.getString(PASSWORD));
+        } else {
+            metadata = new ShardMetadata(
+                    shardKey,
+                    shardKeyType,
+                    config.getString(DATABASE),
+                    config.getString(TABLE),
+                    config.getBoolean(SPLIT_MODE),
+                    new Shard(1, 1, nodes.get(0)));
+        }
+
         List<String> fields = new ArrayList<>();
         if (config.hasPath(FIELDS)) {
             fields.addAll(config.getStringList(FIELDS));
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
index 38c835831..1e5e0ef51 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 import com.clickhouse.client.ClickHouseCredentials;
 import com.clickhouse.client.ClickHouseNode;
 import com.clickhouse.client.ClickHouseProtocol;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.List;
@@ -28,13 +29,16 @@ import java.util.stream.Collectors;
 public class ClickhouseUtil {
 
     public static List<ClickHouseNode> createNodes(String nodeAddress, String 
database, String username,
-                                                   String password) {
+            String password) {
         return Arrays.stream(nodeAddress.split(",")).map(address -> {
             String[] nodeAndPort = address.split(":", 2);
+            if (StringUtils.isEmpty(username) && 
StringUtils.isEmpty(password)) {
+                return 
ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
+                        
Integer.parseInt(nodeAndPort[1])).database(database).build();
+            }
             return 
ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP,
                             
Integer.parseInt(nodeAndPort[1])).database(database)
                     
.credentials(ClickHouseCredentials.fromUserAndPassword(username, 
password)).build();
         }).collect(Collectors.toList());
     }
-
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
index 369de91f4..5c9ed0d4d 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java
@@ -82,7 +82,11 @@ public class ClickhouseBatchSink implements FlinkBatchSink {
 
     @Override
     public CheckResult checkConfig() {
-        return CheckConfigUtil.checkAllExists(config, HOST, TABLE, DATABASE, 
USERNAME, PASSWORD);
+        if (config.hasPath(USERNAME) && config.hasPath(PASSWORD)) {
+            return CheckConfigUtil.checkAllExists(config, HOST, TABLE, 
DATABASE, USERNAME, PASSWORD);
+        } else {
+            return CheckConfigUtil.checkAllExists(config, HOST, TABLE, 
DATABASE);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
index ab91a6dde..53925858e 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -63,8 +63,10 @@ public class ClickhouseClient {
                 clickhouseProperties.put(e.getKey(), 
String.valueOf(e.getValue().unwrapped()));
             });
         }
-        clickhouseProperties.put("user", config.getString(USERNAME));
-        clickhouseProperties.put("password", config.getString(PASSWORD));
+        if (config.hasPath(USERNAME) && config.hasPath(PASSWORD)) {
+            clickhouseProperties.put("user", config.getString(USERNAME));
+            clickhouseProperties.put("password", config.getString(PASSWORD));
+        }
         String jdbcUrl = "jdbc:clickhouse://" + config.getString(HOST) + "/" + 
config.getString(DATABASE);
         this.balancedClickhouseDataSource = new 
BalancedClickhouseDataSource(jdbcUrl, clickhouseProperties);
     }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 99c0ae51b..bed3b6e01 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -116,7 +116,7 @@ class Clickhouse extends SparkBatchSink {
   }
 
   override def checkConfig(): CheckResult = {
-    var checkResult = checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, 
PASSWORD)
+    var checkResult = checkAllExists(config, HOST, TABLE, DATABASE)
     if (checkResult.isSuccess) {
       if (hasSubConfig(config, clickhousePrefix)) {
         extractSubConfig(config, clickhousePrefix, false).entrySet().foreach(e 
=> {
@@ -124,8 +124,10 @@ class Clickhouse extends SparkBatchSink {
         })
       }
 
-      properties.put("user", config.getString(USERNAME))
-      properties.put("password", config.getString(PASSWORD))
+      if (config.hasPath(USERNAME) && config.hasPath(PASSWORD)) {
+        properties.put("user", config.getString(USERNAME))
+        properties.put("password", config.getString(PASSWORD))
+      }
 
       if (config.hasPath(SPLIT_MODE)) {
         splitMode = config.getBoolean(SPLIT_MODE)
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
index 8c3978e53..b0d88aba4 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml
@@ -50,6 +50,11 @@
             <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-clickhouse</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--spark-->
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/ExampleUtils.java
similarity index 85%
copy from 
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
copy to 
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/ExampleUtils.java
index a4a84986c..a3937dedf 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
+++ 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/ExampleUtils.java
@@ -29,10 +29,10 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Paths;
 
-public class SeaTunnelApiExample {
+public class ExampleUtils {
 
-    public static void main(String[] args) throws FileNotFoundException, 
URISyntaxException, CommandException {
-        String configFile = getTestConfigFile("/examples/spark.batch.conf");
+    public static void builder(String configurePath) throws 
FileNotFoundException, URISyntaxException, CommandException {
+        String configFile = getTestConfigFile(configurePath);
         SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
         sparkCommandArgs.setConfigFile(configFile);
         sparkCommandArgs.setCheckConfig(false);
@@ -43,7 +43,7 @@ public class SeaTunnelApiExample {
         Seatunnel.run(sparkCommand);
     }
 
-    public static String getTestConfigFile(String configFile) throws 
FileNotFoundException, URISyntaxException {
+    private static String getTestConfigFile(String configFile) throws 
FileNotFoundException, URISyntaxException {
         URL resource = SeaTunnelApiExample.class.getResource(configFile);
         if (resource == null) {
             throw new FileNotFoundException("Can't find config file: " + 
configFile);
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
index a4a84986c..be4a3c923 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
+++ 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
@@ -17,37 +17,14 @@
 
 package org.apache.seatunnel.example.spark.v2;
 
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.Seatunnel;
-import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
 
 import java.io.FileNotFoundException;
 import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
 
 public class SeaTunnelApiExample {
 
     public static void main(String[] args) throws FileNotFoundException, 
URISyntaxException, CommandException {
-        String configFile = getTestConfigFile("/examples/spark.batch.conf");
-        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
-        sparkCommandArgs.setConfigFile(configFile);
-        sparkCommandArgs.setCheckConfig(false);
-        sparkCommandArgs.setVariables(null);
-        sparkCommandArgs.setDeployMode(DeployMode.CLIENT);
-        Command<SparkCommandArgs> sparkCommand =
-                new SparkCommandBuilder().buildCommand(sparkCommandArgs);
-        Seatunnel.run(sparkCommand);
-    }
-
-    public static String getTestConfigFile(String configFile) throws 
FileNotFoundException, URISyntaxException {
-        URL resource = SeaTunnelApiExample.class.getResource(configFile);
-        if (resource == null) {
-            throw new FileNotFoundException("Can't find config file: " + 
configFile);
-        }
-        return Paths.get(resource.toURI()).toString();
+        ExampleUtils.builder("/examples/spark.batch.conf");
     }
 }
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToClickHouseExample.java
 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToClickHouseExample.java
new file mode 100644
index 000000000..b078c30aa
--- /dev/null
+++ 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiToClickHouseExample.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.spark.v2;
+
+import org.apache.seatunnel.core.starter.exception.CommandException;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+
+public class SeaTunnelApiToClickHouseExample {
+
+    public static void main(String[] args) throws FileNotFoundException, 
URISyntaxException, CommandException {
+        ExampleUtils.builder("/examples/spark.batch.clickhouse.conf");
+    }
+}
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
new file mode 100644
index 000000000..2e27410ee
--- /dev/null
+++ 
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  spark.app.name = "SeaTunnelToClickHouseV2"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    field_name = "name,age"
+  }
+}
+
+
+transform {
+  sql {
+    sql = "select name,age from fake"
+    result_table_name = "sql"
+  }
+}
+
+sink {
+  ClickHouse {
+    host = "139.198.158.103:8123"
+    database = "default"
+    table = "test_clickhouse_table_v2"
+    fields = ["name", "age"]
+    username = 'default'
+    bulk_size = 20000
+    retry_codes = [209, 210]
+    retry = 3
+  }
+}
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml 
b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 757aed3dc..49c634f0b 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -49,6 +49,16 @@
             <artifactId>seatunnel-connector-spark-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-spark-clickhouse</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-clickhouse</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--spark-->
diff --git 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
 
b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkToClickHouseExample.java
similarity index 52%
copy from 
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
copy to 
seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkToClickHouseExample.java
index a4a84986c..eeff4baa7 100644
--- 
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java
+++ 
b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkToClickHouseExample.java
@@ -15,38 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.example.spark.v2;
+package org.apache.seatunnel.example.spark;
 
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.Seatunnel;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
+import org.apache.seatunnel.core.base.Seatunnel;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
 
 import java.io.FileNotFoundException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Paths;
 
-public class SeaTunnelApiExample {
+public class LocalSparkToClickHouseExample {
 
-    public static void main(String[] args) throws FileNotFoundException, 
URISyntaxException, CommandException {
-        String configFile = getTestConfigFile("/examples/spark.batch.conf");
-        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
-        sparkCommandArgs.setConfigFile(configFile);
-        sparkCommandArgs.setCheckConfig(false);
-        sparkCommandArgs.setVariables(null);
-        sparkCommandArgs.setDeployMode(DeployMode.CLIENT);
-        Command<SparkCommandArgs> sparkCommand =
-                new SparkCommandBuilder().buildCommand(sparkCommandArgs);
+    public static void main(String[] args) throws URISyntaxException, 
FileNotFoundException, CommandException {
+        String configFile = 
getTestConfigFile("/examples/spark.batch.clickhouse.conf");
+        SparkCommandArgs sparkArgs = new SparkCommandArgs();
+        sparkArgs.setConfigFile(configFile);
+        sparkArgs.setCheckConfig(false);
+        sparkArgs.setVariables(null);
+        sparkArgs.setDeployMode(DeployMode.CLIENT);
+        Command<SparkCommandArgs> sparkCommand = new 
SparkCommandBuilder().buildCommand(sparkArgs);
         Seatunnel.run(sparkCommand);
     }
 
-    public static String getTestConfigFile(String configFile) throws 
FileNotFoundException, URISyntaxException {
-        URL resource = SeaTunnelApiExample.class.getResource(configFile);
+    public static String getTestConfigFile(String configFile) throws 
URISyntaxException, FileNotFoundException {
+        URL resource = 
LocalSparkToClickHouseExample.class.getResource(configFile);
         if (resource == null) {
-            throw new FileNotFoundException("Can't find config file: " + 
configFile);
+            throw new FileNotFoundException("Could not find config file: " + 
configFile);
         }
         return Paths.get(resource.toURI()).toString();
     }
diff --git 
a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.clickhouse.conf
 
b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.clickhouse.conf
new file mode 100644
index 000000000..caaed1199
--- /dev/null
+++ 
b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.clickhouse.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  spark.app.name = "SeaTunnelToClickHouse"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Fake {
+    result_table_name = "my_dataset"
+  }
+}
+
+
+transform {}
+
+sink {
+  ClickHouse {
+    host = "139.198.158.103:8123"
+    database = "default"
+    table = "test_clickhouse_table"
+    fields = ["name", "age"]
+    username = 'default'
+    bulk_size = 20000
+    retry_codes = [209, 210]
+    retry = 3
+    split_mode = true
+  }
+}

Reply via email to