robobario commented on code in PR #24730:
URL: https://github.com/apache/flink/pull/24730#discussion_r1582425139


##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##########
@@ -51,13 +54,19 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
         checkNotNull(mapper);
         checkNotNull(schema);
 
+        // Prevent Jackson's writeValue() method calls from closing the stream.
+        mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+        mapper.disable(SerializationFeature.FLUSH_AFTER_WRITE_VALUE);
+
         this.converter = checkNotNull(converter);
         this.stream = checkNotNull(stream);
         this.converterContext = converterContext;
         this.csvWriter = mapper.writer(schema);
-
-        // Prevent Jackson's writeValue() method calls from closing the stream.
-        mapper.getFactory().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+        try {
+            this.generator = csvWriter.createGenerator(stream, 
JsonEncoding.UTF8);

Review Comment:
   The Generator manages some resources with their own buffers so I think we 
should `close` the generator during `finish()`. The underlying `CsvEncoder` 
uses a char buffer that jackson [can 
recycle](https://github.com/FasterXML/jackson-dataformats-text/blob/3d3165e58b90618a5fbccf630f1604a383afe78c/csv/src/main/java/com/fasterxml/jackson/dataformat/csv/impl/CsvEncoder.java#L1015)
 using a threadlocal pool.
   
   Disabling `AUTO_CLOSE_TARGET` will still prevent the closing of the 
underlying stream.



##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java:
##########
@@ -98,11 +107,12 @@ static <T> CsvBulkWriter<T, T, Void> forPojo(Class<T> 
pojoClass, FSDataOutputStr
     @Override
     public void addElement(T element) throws IOException {
         final R r = converter.convert(element, converterContext);
-        csvWriter.writeValue(stream, r);
+        csvWriter.writeValue(generator, r);
     }
 
     @Override
     public void flush() throws IOException {
+        generator.flush();
         stream.flush();

Review Comment:
   The `generator.flush()` also flushes the stream, so the following 
`stream.flush()` is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to