Hi Nico,

writeAsCsv has limited functionality in this case. I recommend to use the Bucketing File Sink[1] where you can specify a interval and batch size when to flush.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/filesystem_sink.html#bucketing-file-sink

Timo


Am 27/01/17 um 11:06 schrieb Nico:
Hi,

I am running my Flink job in the local IDE and want to write the results in a csv file using:

stream.writeAsCsv("...", FileSystem.WriteMode.OVERWRITE).setParallelism(1)

While the file is created, it is empty inside. However, writeAsText works. I have checked the CsvOutputFormat and I think that I am not reaching the buffer size. Moreover, flush() is only used in the close function, but I don't know when the function is called. I am reading my data from a kafka source, so it would be an infinite stream?

Is there a way to flush the data earlier, for example within the writeRecord method?

Best regards,
Nico


Reply via email to