Hisoka-X commented on code in PR #3416:
URL:
https://github.com/apache/incubator-seatunnel/pull/3416#discussion_r1044947879
##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java:
##########
@@ -105,56 +124,55 @@ private void nodePasswordCheck() {
}
@Override
- public Optional<CKCommitInfo> prepareCommit() throws IOException {
- return Optional.empty();
+ public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
+ for (FileChannel channel : rowCache.values()) {
+ channel.close();
+ }
+ Map<Shard, List<String>> detachedFiles = new HashMap<>();
+ shardTempFile.forEach((shard, path) -> {
+ try {
+ List<String> clickhouseLocalFiles =
generateClickhouseLocalFiles(path);
+ // move file to server
+ moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+ detachedFiles.put(shard, clickhouseLocalFiles);
+ // clear local file
+ clearLocalFileDirectory(clickhouseLocalFiles);
+ } catch (Exception e) {
+ throw new
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data
into clickhouse file error", e);
+ }
+ });
+ rowCache.clear();
+ shardTempFile.clear();
+ return Optional.of(new CKFileCommitInfo(detachedFiles));
}
@Override
public void abortPrepare() {
-
}
@Override
public void close() throws IOException {
- rowCache.forEach(this::flush);
+ for (FileChannel channel : rowCache.values()) {
+ channel.close();
+ }
}
- private void flush(Shard shard, List<SeaTunnelRow> rows) {
- try {
- // generate clickhouse local file
- // TODO generate file by sub rows to save memory
- List<String> clickhouseLocalFiles =
generateClickhouseLocalFiles(rows);
- // move file to server
- attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
- // clear local file
- clearLocalFileDirectory(clickhouseLocalFiles);
- } catch (Exception e) {
- throw new
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data
into clickhouse file error", e);
- }
+ private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row)
throws IOException {
+ String data = this.readerOption.getFields().stream().map(field ->
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+ .collect(Collectors.joining("\t")) + "\n";
+ MappedByteBuffer buffer =
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
+ data.getBytes(StandardCharsets.UTF_8).length);
Review Comment:
Can you provide your crash log? Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]