This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1b26192cb [Bug] [Connector-V2] Fix ClickhouseFile Committer 
Serializable Problems (#3803)
1b26192cb is described below

commit 1b26192cb3e1df0699c1ced4f9ae35a8796a5bd1
Author: Hisoka <[email protected]>
AuthorDate: Mon Dec 26 17:27:57 2022 +0800

    [Bug] [Connector-V2] Fix ClickhouseFile Committer Serializable Problems 
(#3803)
---
 .../sink/file/ClickhouseFileSinkAggCommitter.java  | 24 +++++++++++++++++++---
 .../clickhouse/sink/file/ClickhouseTable.java      |  3 ++-
 2 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
index 80ce83aa5..5290ff0bd 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -37,10 +37,13 @@ import java.util.Map;
 
 public class ClickhouseFileSinkAggCommitter implements 
SinkAggregatedCommitter<CKFileCommitInfo, CKFileAggCommitInfo> {
 
-    private final ClickhouseProxy proxy;
+    private transient ClickhouseProxy proxy;
     private final ClickhouseTable clickhouseTable;
 
+    private final FileReaderOption fileReaderOption;
+
     public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) {
+        fileReaderOption = readerOption;
         proxy = new 
ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
         clickhouseTable = 
proxy.getClickhouseTable(readerOption.getShardMetadata().getDatabase(),
             readerOption.getShardMetadata().getTable());
@@ -76,13 +79,28 @@ public class ClickhouseFileSinkAggCommitter implements 
SinkAggregatedCommitter<C
 
     }
 
+    private ClickhouseProxy getProxy() {
+        if (proxy != null) {
+            return proxy;
+        }
+        synchronized (this) {
+            if (proxy != null) {
+                return proxy;
+            }
+            proxy = new 
ClickhouseProxy(fileReaderOption.getShardMetadata().getDefaultShard().getNode());
+            return proxy;
+        }
+    }
+
     @Override
     public void close() throws IOException {
-        proxy.close();
+        if (proxy != null) {
+            proxy.close();
+        }
     }
 
     private void attachFileToClickhouse(Shard shard, List<String> 
clickhouseLocalFiles) throws ClickHouseException {
-        ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
+        ClickHouseRequest<?> request = 
getProxy().getClickhouseConnection(shard);
         for (String clickhouseLocalFile : clickhouseLocalFiles) {
             ClickHouseResponse response = request.query(String.format("ALTER 
TABLE %s ATTACH PART '%s'",
                 clickhouseTable.getLocalTableName(),
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
index a6c8e0fe0..b26a2264f 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java
@@ -19,10 +19,11 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-public class ClickhouseTable {
+public class ClickhouseTable implements Serializable {
 
     private String database;
     private String tableName;

Reply via email to