This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f8fefa1e57 [Feature][Connector-V2][Clickhouse] clickhouse writes with 
checkpoints (#4999)
f8fefa1e57 is described below

commit f8fefa1e574f08e2e7092b1ebcf64b0e1b9d4582
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Wed Jul 5 10:15:28 2023 +0800

    [Feature][Connector-V2][Clickhouse] clickhouse writes with checkpoints 
(#4999)
---
 docs/en/connector-v2/sink/Clickhouse.md            |  2 +-
 .../sink/client/ClickhouseSinkWriter.java          | 39 ++++++++++++----------
 2 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/docs/en/connector-v2/sink/Clickhouse.md 
b/docs/en/connector-v2/sink/Clickhouse.md
index 05d03330c7..7c4bab991b 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -58,7 +58,7 @@ In addition to the above mandatory parameters that must be 
specified by `clickho
 
 ### bulk_size [number]
 
-The number of rows written through 
[Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the 
`default is 20000` .
+The number of rows written through 
[Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the 
`default is 20000`, if checkpoints are enabled, writing will also occur at the 
times when the checkpoints are satisfied .
 
 ### split_mode [boolean]
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index 443eec921a..235279b4d5 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -90,6 +90,7 @@ public class ClickhouseSinkWriter
 
     @Override
     public Optional<CKCommitInfo> prepareCommit() throws IOException {
+        flush();
         return Optional.empty();
     }
 
@@ -99,23 +100,7 @@ public class ClickhouseSinkWriter
     @Override
     public void close() throws IOException {
         this.proxy.close();
-        for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
-            try (ClickHouseConnectionImpl needClosedConnection =
-                            batchStatement.getClickHouseConnection();
-                    JdbcBatchStatementExecutor needClosedStatement =
-                            batchStatement.getJdbcBatchStatementExecutor()) {
-                IntHolder intHolder = batchStatement.getIntHolder();
-                if (intHolder.getValue() > 0) {
-                    flush(needClosedStatement);
-                    intHolder.setValue(0);
-                }
-            } catch (SQLException e) {
-                throw new ClickhouseConnectorException(
-                        CommonErrorCode.SQL_OPERATION_FAILED,
-                        "Failed to close prepared statement.",
-                        e);
-            }
-        }
+        flush();
     }
 
     private void addIntoBatch(SeaTunnelRow row, JdbcBatchStatementExecutor 
clickHouseStatement) {
@@ -138,6 +123,26 @@ public class ClickhouseSinkWriter
         }
     }
 
+    private void flush() {
+        for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
+            try (ClickHouseConnectionImpl needClosedConnection =
+                            batchStatement.getClickHouseConnection();
+                    JdbcBatchStatementExecutor needClosedStatement =
+                            batchStatement.getJdbcBatchStatementExecutor()) {
+                IntHolder intHolder = batchStatement.getIntHolder();
+                if (intHolder.getValue() > 0) {
+                    flush(needClosedStatement);
+                    intHolder.setValue(0);
+                }
+            } catch (SQLException e) {
+                throw new ClickhouseConnectorException(
+                        CommonErrorCode.SQL_OPERATION_FAILED,
+                        "Failed to close prepared statement.",
+                        e);
+            }
+        }
+    }
+
     private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
         Map<Shard, ClickhouseBatchStatement> result = new 
HashMap<>(Common.COLLECTION_SIZE);
         shardRouter

Reply via email to