Rion, If you can arrange for each tenant's events to be in only one kafka partition, that should be the best way to simplify the processing you need to do. Otherwise, a simple change that may help would be to increase the bounded delay you use in calculating your own per-tenant watermarks, thereby making late events less likely.
David On Sat, Feb 27, 2021 at 3:29 AM Rion Williams <rionmons...@gmail.com> wrote: > David and Timo, > > Firstly, thank you both so much for your contributions and advice. I > believe I’ve implemented things along the lines that you both detailed and > things appear to work just as expected (e.g. I can see things arriving, > being added to windows, discarding late records, and ultimately writing out > files as expected). > > With that said, I have one question / issue that I’ve run into with > handling the data coming my Kafka topic. Currently, my tenant/source (i.e. > my key) may be distributed across the 10 partitions of my Kafka topic. With > the way that I’m consuming from this topic (with a Kafka Consumer), it > looks like my data is arriving in a mixed order which seems to be causing > my own watermarks (those stored in my ValueState) to process as later data > may arrive earlier than other data and cause my windows to be evicted. > > I’m currently using the `withNoWatermarks()` along with a custom timestamp > assigned to handle all of my timestamping, but is there a mechanism to > handle the mixed ordering across partitions in this scenario at the Flink > level? > > I know the answer here likely lies with Kafka and adopting a better keying > strategy to ensure the same tenant/source (my key) lands on the same > partition, which by definition ensures ordering. I’m just wondering if > there’s some mechanism to accomplish this post-reading from Kafka in Flink > within my pipeline to handle things in a similar fashion? > > Again - thank you both so much, I’m loving the granularity and control > that Flink has been providing me over other streaming technologies I’ve > used in the past. I’m totally sold on it and am looking forward to doing > more incredible things with it. > > Best regards, > > Rion > > On Feb 26, 2021, at 4:36 AM, David Anderson <dander...@apache.org> wrote: > > > Yes indeed, Timo is correct -- I am proposing that you not use timers at > all. Watermarks and event-time timers go hand in hand -- and neither > mechanism can satisfy your requirements. > > You can instead put all of the timing logic in the processElement method > -- effectively emulating what you would get if Flink were to offer per-key > watermarking. > > The reason that the PseudoWindow example is using MapState is that for > each key/tenant, more than one window can be active simultaneously. This > occurs because the event stream is out-of-order with respect to time, so > events for the "next window" are probably being processed before "the > previous" window is complete. And if you want to accommodate allowed > lateness, the requirement to have several windows open at once becomes even > more important. > > MapState gives you a per-tenant hashmap, where each entry in that map > corresponds to an open window for some particular tenant, where the map's > key is the timestamp for a window, and the value is whatever state you want > that window to hold. > > Best regards, > David > > > > > On Fri, Feb 26, 2021 at 9:44 AM Timo Walther <twal...@apache.org> wrote: > >> Hi Rion, >> >> I think what David was refering to is that you do the entire time >> handling yourself in process function. That means not using the >> `context.timerService()` or `onTimer()` that Flink provides but calling >> your own logic based on the timestamps that enter your process function >> and the stored state. >> >> Regards, >> Timo >> >> >> On 26.02.21 00:29, Rion Williams wrote: >> > >> > Hi David, >> > >> > Thanks for your prompt reply, it was very helpful and the PseudoWindow >> > example is excellent. I believe it closely aligns with an approach that >> > I was tinkering with but seemed to be missing a few key pieces. In my >> > case, I'm essentially going to want to be aggregating the messages that >> > are coming into the window (a simple string-concatenation aggregation >> > would work). Would I need another form of state to hold that, as >> looking >> > through this example with naive eyes, it seems that this function is >> > currently storing multiple windows in state via the MapState provided: >> > >> > // Keyed, managed state, with an entry for each window, keyed by the >> > window's end time. >> > // There is a separate MapState object for each driver. >> > private transient MapState<Long, Float> sumOfTips; >> > >> > If I wanted to perform an aggregation for each key/tenant, would a >> > MapState be appropriate? Such as a MapState<Long, String> if I was >> doing >> > a string aggregation, so that within my processElement function I could >> > use something similar for building these aggregations and ultimately >> > triggering them: >> > >> > // Keep track of a tenant/source specific watermark >> > private lateinit var currentWatermark: ValueState<Long> >> > // Keep track of the contents of each of the windows where the key >> > represents the close >> > // of the window and the contents represents an accumulation of the >> > records for that window >> > private lateinit var windowContents: MapState<Long, String> >> > >> > If that's the case, this is what I've thrown together thus far and I >> > feel like it's moving in the right direction: >> > >> > class MagicWindow(private val duration: Long, private val lateness: >> Long): >> > KeyedProcessFunction<String, Event, FileOutput>(){ >> > >> > // Keep track of a tenant/source specific watermark >> > private lateinit var currentWatermark: ValueState<Long> >> > // Keep track of the contents of each of the windows where the key >> > represents the close >> > // of the window and the contents represents an accumulation of >> the >> > records for that window >> > private lateinit var windowContents: MapState<Long, String> >> > >> > override fun open(config: Configuration) { >> > currentWatermark = runtimeContext.getState(watermark) >> > currentWatermark.update(Long.MIN_VALUE) >> > } >> > >> > override fun processElement(element: Event, context: Context, out: >> > Collector<FileOutput>) { >> > // Resolve the event time >> > val eventTime: Long = getEventTime(element) >> > >> > // Update watermark (if applicable) >> > if (currentWatermark.value() < eventTime){ >> > currentWatermark.update(eventTime) >> > } >> > >> > // Define a timer for this window >> > val timerService = context.timerService() >> > >> > if (eventTime <= timerService.currentWatermark()) { >> > // This event is late; its window has already been >> triggered. >> > } else { >> > // Determine the "actual" window closure and start a timer >> > for it >> > // (eventTime + window >> > val endOfWindow= eventTime - (eventTime % duration) + >> > duration - 1 >> > >> > // Schedule a callback for when the window has been >> completed. >> > timerService.registerEventTimeTimer(endOfWindow) >> > >> > // Add this element to the corresponding aggregation for >> > this window >> > windowContents.put(endOfWindow, >> windowContents[endOfWindow] >> > + "$element") >> > } >> > } >> > >> > override fun onTimer(timestamp: Long, context: OnTimerContext, >> out: >> > Collector<FileOutput>) { >> > val key = context.currentKey >> > val currentAggregation: String = windowContents.get(timestamp) >> > >> > // Output things here and clear the current aggregation for >> this >> > // tenant/source combination in this window >> > } >> > >> > companion object { >> > private val watermark = ValueStateDescriptor( >> > "watermark", >> > Long::class.java >> > ) >> > >> > private val windowContents = MapStateDescriptor( >> > "window-contents", >> > Long::class.java, >> > String::class.java >> > ) >> > >> > fun getEventTime(element: Event): Long { >> > return Instant(element.`source$1`.createdTimestamp).millis >> > } >> > } >> > } >> > >> > Is something glaringly off with this? I’ll need to do some additionally >> > reading on the timers, but any additional clarification would be >> greatly >> > appreciated. >> > >> > Thanks so much for your initial response again! >> > >> > Rion >> > >> >> On Feb 25, 2021, at 3:27 PM, David Anderson <dander...@apache.org> >> wrote: >> >> >> >> >> >> Rion, >> >> >> >> What you want isn't really achievable with the APIs you are using. >> >> Without some sort of per-key (per-tenant) watermarking -- which Flink >> >> doesn't offer -- the watermarks and windows for one tenant can be held >> >> up by the failure of another tenant's events to arrive in a timely >> manner. >> >> >> >> However, your pipeline is pretty straightforward, and it shouldn't be >> >> particularly difficult to accomplish what you want. What you can do is >> >> to ignore the built-in watermarking and windowing APIs, and build >> >> equivalent functionality in the form of a KeyedProcessFunction. >> >> >> >> The Flink docs include an example [1] showing how to implement your >> >> own tumbling event time windows with a process function. That >> >> implementation assumes you can rely on watermarks for triggering the >> >> windows; you'll have to do that differently. >> >> >> >> What you can do instead is to track, in ValueState, the largest >> >> timestamp you've seen so far (for each key/tenant). Whenever that >> >> advances, you can subtract the bounded-out-of-orderness duration from >> >> that timestamp, and then check to see if the resulting value is now >> >> large enough to trigger any of the windows for that key/tenant. >> >> >> >> Handling allowed lateness should be pretty straightforward. >> >> >> >> Hope this helps, >> >> David >> >> >> >> [1] >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example >> >> < >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example >> > >> >> >> >> On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com >> >> <mailto:rionmons...@gmail.com>> wrote: >> >> >> >> Hey folks, I have a somewhat high-level/advice question regarding >> >> Flink and if it has the mechanisms in place to accomplish what I’m >> >> trying to do. I’ve spent a good bit of time using Apache Beam, but >> >> recently pivoted over to native Flink simply because some of the >> >> connectors weren’t as mature or didn’t support some of the >> >> functionality that I needed. >> >> >> >> Basically - I have a single Kafka topic with 10 partitions that >> >> I’m consuming from. This is a multi-tenant topic containing data >> >> that comes in at various times from various tenants and is not at >> >> all guaranteed to be in order, at least with regards to “event >> >> time”, which is what I care about. >> >> >> >> What I’m trying to accomplish is this: *Given a multi-tenant topic >> >> with records eventually distributed across partitions, is it >> >> possible to consume and window each of these records independently >> >> of one another without one tenant potentially influencing another >> >> and write out to separate files per tenant/source (i.e. some other >> >> defined property on the records)?” >> >> * >> >> My pipeline currently looks something like this: >> >> >> >> @JvmStatic >> >> fun main(args: Array<String>) { >> >> val pipeline = StreamExecutionEnvironment >> >> .getExecutionEnvironment() >> >> //.createLocalEnvironmentWithWebUI(Configuration()) >> >> >> >> val properties = buildPropertiesFromArgs(args) >> >> val stream = pipeline >> >> .addSource(readFromKafka("events", properties)) >> >> .assignTimestampsAndWatermarks( >> >> WatermarkStrategy >> >> >> >> .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...)) >> >> .withTimestampAssigner { event: Event, _: Long -> >> >> // Assign the created timestamp as the event >> >> timestamp >> >> Instant(event.createdTimestamp).millis >> >> } >> >> ) >> >> >> >> // There are multiple data sources that each have their own >> >> windows and allowed lateness >> >> // so ensure that each source only handles records for it >> >> DataSources.forEach { source -> >> >> stream >> >> .filter { event -> >> >> event.source == source.name <http://source.name> >> >> } >> >> .keyBy { event -> >> >> //print("Keying record with id ${record.`id$1`} by >> >> tenant ${record.`source$1`.tenantName}") >> >> event.tenant >> >> } >> >> .window( >> >> >> >> TumblingEventTimeWindows.of(Time.minutes(source.windowDuration)) >> >> ) >> >> .allowedLateness( >> >> Time.minutes(source.allowedLateness) >> >> ) >> >> .process( >> >> // This just contains some logic to take the >> >> existing windows and construct a file >> >> // using the window range and keys (tenant/source) >> >> with the values being >> >> // an aggregation of all of the records >> >> WindowedEventProcessFunction(source.name >> >> <http://source.name>) >> >> ) >> >> .map { summary -> >> >> // This would be a sink to write to a file >> >> } >> >> } >> >> pipeline.execute("event-processor") >> >> } >> >> >> >> My overarching question is really - *Can I properly separate the >> >> data with custom watermark strategies and ensure that keying (or >> >> some other construct) is enough to allow each tenant/source >> >> combination to be treated as it’s own stream with it’s own >> >> watermarking? *I know I could possibly break the single topic up >> >> into multiple disparate topics, however that level of granularity >> >> would likely result in several thousand (7000+) topics so I'm >> >> hoping that some of the constructs available within Flink may help >> >> with this (WatermarkStrategies, etc.) >> >> >> >> Any recommendations / advice would be extremely helpful as I'm >> >> quite new to the Flink world, however I have quite a bit of >> >> experience in Apache Beam, Kafka Streams, and a smattering of >> >> other streaming technologies. >> >> >> >> Thanks much, >> >> >> >> Rion >> >> >> >>