Hello, I’m writing a Flink job that reads heterogenius (one row contains several types that need to be partitioned downstream) data from AWS Kinesis and writes to S3 directory structure like s3://bucket/year/month/day/hour/type, this all works great with StreamingFileSink in Flink 1.9, but problem is that I need to immedietely (or “as soon as possible” rather) let know another application to know when “hour” bucket has rolled (i.e. we’re 100% sure it won’t write any more data for this hour). Another problem is that data can be very skewed in types, e.g. one hour can contain 90% of rows with typeA, 30% of rows with typeB and 1% of rows with typeC.
My current plan is to: 1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t care about event time at all) 2. Assign every row its bucket in a windowing function 3. Write a stateful BucketAssigner that: 3.1. Keeps its last window in a mutable variable 3.2. Once we received a row with newer window sends a message to SQS and increments the window My biggest concern now is about 3rd point. For me BucketAssigner looks like a pure function of (Row, Time) -> Bucket and I’m not sure that introducing state and side-effect there would be reasonable. Is there any other ways to do it? I’m also thinking on how I should couple this with checkpointing mechanism as ideally I’d like to not invoke this callback before checkpoint is written. StreamingFileSink provides not much ways to extend it. I tried to re-implement it for my purposes, but stumbled upon many private methods and classes, so even though it looks possible, the end result probably will be too ugly. To make things a little bit easier, I don’t care too much about delivery semantics of those final SQS messages - if I get only ~99% of them - that’s fine, if some of them will be duplicated - that’s also fine. Regards, Anton