Hi, FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the stream will not end.
Simple `writeAsCsv(…)` on the other hand only flushes the output file on a stream end (see `OutputFormatSinkFunction`). You can either use `PROCESS_ONCE` mode or use more advanced data sink: - BucketingSink - re-use `writeAsCsv(…)` code by extending OutputFormatSinkFunction and implementing `CheckpointedFunction` to flush on snapshots (for at-least-once) - write your own sink by extending `TwoPhaseCommitSinkFunction` (to support `exactly-once`) Piotrek > On 2 Feb 2018, at 18:32, geoff halmo <geoffha...@gmail.com> wrote: > > Hi Flink community: > > I am testing Flink but can't write the final(18 or so elements out to disk) > > Setup: > Using NYC yellow taxi from data 2017-09.csv, I sorted the data on > pickup_datetime in bash. I am working in event time. > > Skeleton program: > val ds = senv.readFile(input_format, input_path, > FileProcessMode.PROCESS_CONTINUOUSLY, 1000) > > ds.flatMap(row => parse(row) > .assignAscendingTimestamps( _.datetime) > .timeWindowAll(Time.hours(1)) > .process( new MyProcessAllWIndowFunction() ) > .writeCsv > > Issue: > The last line is a half line: > tail -n1 output.csv > 1506553200000,2017-09-27T:19:00-4:00[user@computer] > > When I use .print instead of .writeCsv, the last line on console is > 1506826800000,2017-09-30T23:00-400[America/New_York],21353