Hisoka-X commented on code in PR #3416:
URL:
https://github.com/apache/incubator-seatunnel/pull/3416#discussion_r1022298099
##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java:
##########
@@ -102,56 +118,53 @@ private void nodePasswordCheck() {
}
@Override
- public Optional<CKCommitInfo> prepareCommit() throws IOException {
- return Optional.empty();
+ public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
+ for (FileChannel channel : rowCache.values()) {
+ channel.close();
+ }
+ Map<Shard, List<String>> detachedFiles = new HashMap<>();
+ shardTempFile.forEach((shard, path) -> {
+ try {
+ List<String> clickhouseLocalFiles =
generateClickhouseLocalFiles(path);
+ // move file to server
+ moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+ detachedFiles.put(shard, clickhouseLocalFiles);
+ // clear local file
+ clearLocalFileDirectory(clickhouseLocalFiles);
+ } catch (Exception e) {
+ throw new SeaTunnelException("handle with file failed.", e);
+ }
+ });
+ rowCache.clear();
+ shardTempFile.clear();
+ return Optional.of(new CKFileCommitInfo(detachedFiles));
}
@Override
public void abortPrepare() {
-
}
@Override
public void close() throws IOException {
- rowCache.forEach(this::flush);
+ for (FileChannel channel : rowCache.values()) {
+ channel.close();
+ }
}
- private void flush(Shard shard, List<SeaTunnelRow> rows) {
- try {
- // generate clickhouse local file
- // TODO generate file by sub rows to save memory
- List<String> clickhouseLocalFiles =
generateClickhouseLocalFiles(rows);
- // move file to server
- attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
- // clear local file
- clearLocalFileDirectory(clickhouseLocalFiles);
- } catch (Exception e) {
- throw new RuntimeException("Flush data into clickhouse file
error", e);
- }
+ private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row)
throws IOException {
+ String data = this.readerOption.getFields().stream().map(field ->
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+ .collect(Collectors.joining("\t")) + "\n";
+ MappedByteBuffer buffer =
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+ data.getBytes(StandardCharsets.UTF_8).length);
+ buffer.put(data.getBytes(StandardCharsets.UTF_8));
}
- private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows)
throws IOException,
- InterruptedException {
- if (rows.isEmpty()) {
- return Collections.emptyList();
- }
+ private List<String> generateClickhouseLocalFiles(String
clickhouseLocalFileTmpFile) throws IOException,
+ InterruptedException {
String uuid = UUID.randomUUID().toString().substring(0,
UUID_LENGTH).replaceAll("-", "_");
Review Comment:
Yes, I will modify it. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]