Z1Wu commented on code in PR #3416:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3416#discussion_r1022292350


##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java:
##########
@@ -102,56 +118,53 @@ private void nodePasswordCheck() {
     }
 
     @Override
-    public Optional<CKCommitInfo> prepareCommit() throws IOException {
-        return Optional.empty();
+    public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
+        for (FileChannel channel : rowCache.values()) {
+            channel.close();
+        }
+        Map<Shard, List<String>> detachedFiles = new HashMap<>();
+        shardTempFile.forEach((shard, path) -> {
+            try {
+                List<String> clickhouseLocalFiles = 
generateClickhouseLocalFiles(path);
+                // move file to server
+                moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+                detachedFiles.put(shard, clickhouseLocalFiles);
+                // clear local file
+                clearLocalFileDirectory(clickhouseLocalFiles);
+            } catch (Exception e) {
+                throw new SeaTunnelException("handle with file failed.", e);
+            }
+        });
+        rowCache.clear();
+        shardTempFile.clear();
+        return Optional.of(new CKFileCommitInfo(detachedFiles));
     }
 
     @Override
     public void abortPrepare() {
-
     }
 
     @Override
     public void close() throws IOException {
-        rowCache.forEach(this::flush);
+        for (FileChannel channel : rowCache.values()) {
+            channel.close();
+        }
     }
 
-    private void flush(Shard shard, List<SeaTunnelRow> rows) {
-        try {
-            // generate clickhouse local file
-            // TODO generate file by sub rows to save memory
-            List<String> clickhouseLocalFiles = 
generateClickhouseLocalFiles(rows);
-            // move file to server
-            attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
-            // clear local file
-            clearLocalFileDirectory(clickhouseLocalFiles);
-        } catch (Exception e) {
-            throw new RuntimeException("Flush data into clickhouse file 
error", e);
-        }
+    private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) 
throws IOException {
+        String data = this.readerOption.getFields().stream().map(field -> 
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+            .collect(Collectors.joining("\t")) + "\n";
+        MappedByteBuffer buffer = 
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+            data.getBytes(StandardCharsets.UTF_8).length);
+        buffer.put(data.getBytes(StandardCharsets.UTF_8));
     }
 
-    private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows) 
throws IOException,
-            InterruptedException {
-        if (rows.isEmpty()) {
-            return Collections.emptyList();
-        }
+    private List<String> generateClickhouseLocalFiles(String 
clickhouseLocalFileTmpFile) throws IOException,
+        InterruptedException {
         String uuid = UUID.randomUUID().toString().substring(0, 
UUID_LENGTH).replaceAll("-", "_");

Review Comment:
   `uuid` can be extract from `clickhouseLocalFileTmpFile`.
   ``` java 
   // temp file path format PREFIX/<uuid>/suffix
     String[] tmpStrArr = clickhouseLocalFileTmpFile.split("/");
     String uuid = tmpStrArr[tmpStrArr.length - 2];
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to