robobario commented on code in PR #24730: URL: https://github.com/apache/flink/pull/24730#discussion_r1582425139
########## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java: ########## @@ -51,13 +54,19 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> { checkNotNull(mapper); checkNotNull(schema); + // Prevent Jackson's writeValue() method calls from closing the stream. + mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + mapper.disable(SerializationFeature.FLUSH_AFTER_WRITE_VALUE); + this.converter = checkNotNull(converter); this.stream = checkNotNull(stream); this.converterContext = converterContext; this.csvWriter = mapper.writer(schema); - - // Prevent Jackson's writeValue() method calls from closing the stream. - mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + try { + this.generator = csvWriter.createGenerator(stream, JsonEncoding.UTF8); Review Comment: The Generator manages some resources with their own buffers so I think we should `close` the generator during `finish()`. The underlying `CsvEncoder` uses a char buffer that jackson [can recycle](https://github.com/FasterXML/jackson-dataformats-text/blob/3d3165e58b90618a5fbccf630f1604a383afe78c/csv/src/main/java/com/fasterxml/jackson/dataformat/csv/impl/CsvEncoder.java#L1015) using a threadlocal pool. Disabling `AUTO_CLOSE_TARGET` will still prevent the closing of the underlying stream. ########## flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java: ########## @@ -98,11 +107,12 @@ static <T> CsvBulkWriter<T, T, Void> forPojo(Class<T> pojoClass, FSDataOutputStr @Override public void addElement(T element) throws IOException { final R r = converter.convert(element, converterContext); - csvWriter.writeValue(stream, r); + csvWriter.writeValue(generator, r); } @Override public void flush() throws IOException { + generator.flush(); stream.flush(); Review Comment: The `generator.flush()` also flushes the stream, so the following `stream.flush()` is redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org