Hi, I think ProcessFunction[1] is what you want. You can add it after keyBy and emit the result to sink after timeout or buffer filled. The reference has a good example that show you how to use it.
Best Regards, Tony Wei [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html 2017-10-30 23:56 GMT+08:00 Telco Phone <tel...@yahoo.com>: > I have a process that will take 250,000 records from kafka and produce a > file. (Using a CustomFileSync) > > Currently I just have the following: > > > DataStream<SchemaRecord> stream = > env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topic"", schema, > properties)).setParallelism(40).flatMap(new SchemaRecordSplit()). > setParallelism(40).name("Splitter").keyBy("partition", "randomkey", > "schemaId"); > > stream.addSink(new CustomFileSystemSink()).setParallelism(40); > > > In my CustomFileSystemSink I have a for..next loop which closes the file > off at 250K rows. > > > What I am looking to do is to close off the file every 5 min OR 250K > rows... > > > As I read the window types is it possible to read from kafka and have the > sink close every 5 min OR 250K rows ? > > Hope this makes sense.... > > > > >