Z1Wu commented on code in PR #3416:
URL:
https://github.com/apache/incubator-seatunnel/pull/3416#discussion_r1042311747
##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java:
##########
@@ -105,56 +124,55 @@ private void nodePasswordCheck() {
}
@Override
- public Optional<CKCommitInfo> prepareCommit() throws IOException {
- return Optional.empty();
+ public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
+ for (FileChannel channel : rowCache.values()) {
+ channel.close();
+ }
+ Map<Shard, List<String>> detachedFiles = new HashMap<>();
+ shardTempFile.forEach((shard, path) -> {
+ try {
+ List<String> clickhouseLocalFiles =
generateClickhouseLocalFiles(path);
+ // move file to server
+ moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
+ detachedFiles.put(shard, clickhouseLocalFiles);
+ // clear local file
+ clearLocalFileDirectory(clickhouseLocalFiles);
+ } catch (Exception e) {
+ throw new
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data
into clickhouse file error", e);
+ }
+ });
+ rowCache.clear();
+ shardTempFile.clear();
+ return Optional.of(new CKFileCommitInfo(detachedFiles));
}
@Override
public void abortPrepare() {
-
}
@Override
public void close() throws IOException {
- rowCache.forEach(this::flush);
+ for (FileChannel channel : rowCache.values()) {
+ channel.close();
+ }
}
- private void flush(Shard shard, List<SeaTunnelRow> rows) {
- try {
- // generate clickhouse local file
- // TODO generate file by sub rows to save memory
- List<String> clickhouseLocalFiles =
generateClickhouseLocalFiles(rows);
- // move file to server
- attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles);
- // clear local file
- clearLocalFileDirectory(clickhouseLocalFiles);
- } catch (Exception e) {
- throw new
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data
into clickhouse file error", e);
- }
+ private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row)
throws IOException {
+ String data = this.readerOption.getFields().stream().map(field ->
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
Review Comment:
Simplely using `toString` to serialize `SeatunnelRow` Object into file,
which eventually calls the underlying object's `toString` method, will generate
some data field string that can't be recognized by `clickhouse local` program.
e.g. Datetime object's toString method or `String` which contains `\t`
char.( will mess up the resulting tsv file).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]