Rion,

If you can arrange for each tenant's events to be in only one kafka
partition, that should be the best way to simplify the processing you need
to do. Otherwise, a simple change that may help would be to increase the
bounded delay you use in calculating your own per-tenant watermarks,
thereby making late events less likely.

David

On Sat, Feb 27, 2021 at 3:29 AM Rion Williams <rionmons...@gmail.com> wrote:

> David and Timo,
>
> Firstly, thank you both so much for your contributions and advice. I
> believe I’ve implemented things along the lines that you both detailed and
> things appear to work just as expected (e.g. I can see things arriving,
> being added to windows, discarding late records, and ultimately writing out
> files as expected).
>
> With that said, I have one question / issue that I’ve run into with
> handling the data coming my Kafka topic. Currently, my tenant/source (i.e.
> my key) may be distributed across the 10 partitions of my Kafka topic. With
> the way that I’m consuming from this topic (with a Kafka Consumer), it
> looks like my data is arriving in a mixed order which seems to be causing
> my own watermarks (those stored in my ValueState) to process as later data
> may arrive earlier than other data and cause my windows to be evicted.
>
> I’m currently using the `withNoWatermarks()` along with a custom timestamp
> assigned to handle all of my timestamping, but is there a mechanism to
> handle the mixed ordering across partitions in this scenario at the Flink
> level?
>
> I know the answer here likely lies with Kafka and adopting a better keying
> strategy to ensure the same tenant/source (my key) lands on the same
> partition, which by definition ensures ordering. I’m just wondering if
> there’s some mechanism to accomplish this post-reading from Kafka in Flink
> within my pipeline to handle things in a similar fashion?
>
> Again - thank you both so much, I’m loving the granularity and control
> that Flink has been providing me over other streaming technologies I’ve
> used in the past. I’m totally sold on it and am looking forward to doing
> more incredible things with it.
>
> Best regards,
>
> Rion
>
> On Feb 26, 2021, at 4:36 AM, David Anderson <dander...@apache.org> wrote:
>
> 
> Yes indeed, Timo is correct -- I am proposing that you not use timers at
> all. Watermarks and event-time timers go hand in hand -- and neither
> mechanism can satisfy your requirements.
>
> You can instead put all of the timing logic in the processElement method
> -- effectively emulating what you would get if Flink were to offer per-key
> watermarking.
>
> The reason that the PseudoWindow example is using MapState is that for
> each key/tenant, more than one window can be active simultaneously. This
> occurs because the event stream is out-of-order with respect to time, so
> events for the "next window" are probably being processed before "the
> previous" window is complete. And if you want to accommodate allowed
> lateness, the requirement to have several windows open at once becomes even
> more important.
>
> MapState gives you a per-tenant hashmap, where each entry in that map
> corresponds to an open window for some particular tenant, where the map's
> key is the timestamp for a window, and the value is whatever state you want
> that window to hold.
>
> Best regards,
> David
>
>
>
>
> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Rion,
>>
>> I think what David was refering to is that you do the entire time
>> handling yourself in process function. That means not using the
>> `context.timerService()` or `onTimer()` that Flink provides but calling
>> your own logic based on the timestamps that enter your process function
>> and the stored state.
>>
>> Regards,
>> Timo
>>
>>
>> On 26.02.21 00:29, Rion Williams wrote:
>> > 
>> > Hi David,
>> >
>> > Thanks for your prompt reply, it was very helpful and the PseudoWindow
>> > example is excellent. I believe it closely aligns with an approach that
>> > I was tinkering with but seemed to be missing a few key pieces. In my
>> > case, I'm essentially going to want to be aggregating the messages that
>> > are coming into the window (a simple string-concatenation aggregation
>> > would work). Would I need another form of state to hold that, as
>> looking
>> > through this example with naive eyes, it seems that this function is
>> > currently storing multiple windows in state via the MapState provided:
>> >
>> > // Keyed, managed state, with an entry for each window, keyed by the
>> > window's end time.
>> > // There is a separate MapState object for each driver.
>> > private transient MapState<Long, Float> sumOfTips;
>> >
>> > If I wanted to perform an aggregation for each key/tenant, would a
>> > MapState be appropriate? Such as a MapState<Long, String> if I was
>> doing
>> > a string aggregation, so that within my processElement function I could
>> > use something similar for building these aggregations and ultimately
>> > triggering them:
>> >
>> > // Keep track of a tenant/source specific watermark
>> > private lateinit var currentWatermark: ValueState<Long>
>> > // Keep track of the contents of each of the windows where the key
>> > represents the close
>> > // of the window and the contents represents an accumulation of the
>> > records for that window
>> > private lateinit var windowContents: MapState<Long, String>
>> >
>> > If that's the case, this is what I've thrown together thus far and I
>> > feel like it's moving in the right direction:
>> >
>> > class MagicWindow(private val duration: Long, private val lateness:
>> Long):
>> >      KeyedProcessFunction<String, Event, FileOutput>(){
>> >
>> >      // Keep track of a tenant/source specific watermark
>> >      private lateinit var currentWatermark: ValueState<Long>
>> >      // Keep track of the contents of each of the windows where the key
>> > represents the close
>> >      // of the window and the contents represents an accumulation of
>> the
>> > records for that window
>> >      private lateinit var windowContents: MapState<Long, String>
>> >
>> >      override fun open(config: Configuration) {
>> >          currentWatermark = runtimeContext.getState(watermark)
>> >          currentWatermark.update(Long.MIN_VALUE)
>> >      }
>> >
>> >      override fun processElement(element: Event, context: Context, out:
>> > Collector<FileOutput>) {
>> >          // Resolve the event time
>> >          val eventTime: Long = getEventTime(element)
>> >
>> >          // Update watermark (if applicable)
>> >          if (currentWatermark.value() < eventTime){
>> >              currentWatermark.update(eventTime)
>> >          }
>> >
>> >          // Define a timer for this window
>> >          val timerService = context.timerService()
>> >
>> >          if (eventTime <= timerService.currentWatermark()) {
>> >              // This event is late; its window has already been
>> triggered.
>> >          } else {
>> >              // Determine the "actual" window closure and start a timer
>> > for it
>> >              // (eventTime + window
>> >              val endOfWindow= eventTime - (eventTime % duration) +
>> > duration - 1
>> >
>> >              // Schedule a callback for when the window has been
>> completed.
>> >              timerService.registerEventTimeTimer(endOfWindow)
>> >
>> >              // Add this element to the corresponding aggregation for
>> > this window
>> >              windowContents.put(endOfWindow,
>> windowContents[endOfWindow]
>> > + "$element")
>> >          }
>> >      }
>> >
>> >      override fun onTimer(timestamp: Long, context: OnTimerContext,
>> out:
>> > Collector<FileOutput>) {
>> >          val key = context.currentKey
>> >          val currentAggregation: String = windowContents.get(timestamp)
>> >
>> >          // Output things here and clear the current aggregation for
>> this
>> >          // tenant/source combination in this window
>> >      }
>> >
>> >      companion object {
>> >          private val watermark = ValueStateDescriptor(
>> >              "watermark",
>> >              Long::class.java
>> >          )
>> >
>> >          private val windowContents = MapStateDescriptor(
>> >              "window-contents",
>> >              Long::class.java,
>> >              String::class.java
>> >          )
>> >
>> >          fun getEventTime(element: Event): Long {
>> >              return Instant(element.`source$1`.createdTimestamp).millis
>> >          }
>> >      }
>> > }
>> >
>> > Is something glaringly off with this? I’ll need to do some additionally
>> > reading on the timers, but any additional clarification would be
>> greatly
>> > appreciated.
>> >
>> > Thanks so much for your initial response again!
>> >
>> > Rion
>> >
>> >> On Feb 25, 2021, at 3:27 PM, David Anderson <dander...@apache.org>
>> wrote:
>> >>
>> >> 
>> >> Rion,
>> >>
>> >> What you want isn't really achievable with the APIs you are using.
>> >> Without some sort of per-key (per-tenant) watermarking -- which Flink
>> >> doesn't offer -- the watermarks and windows for one tenant can be held
>> >> up by the failure of another tenant's events to arrive in a timely
>> manner.
>> >>
>> >> However, your pipeline is pretty straightforward, and it shouldn't be
>> >> particularly difficult to accomplish what you want. What you can do is
>> >> to ignore the built-in watermarking and windowing APIs, and build
>> >> equivalent functionality in the form of a KeyedProcessFunction.
>> >>
>> >> The Flink docs include an example [1] showing how to implement your
>> >> own tumbling event time windows with a process function. That
>> >> implementation assumes you can rely on watermarks for triggering the
>> >> windows; you'll have to do that differently.
>> >>
>> >> What you can do instead is to track, in ValueState, the largest
>> >> timestamp you've seen so far (for each key/tenant). Whenever that
>> >> advances, you can subtract the bounded-out-of-orderness duration from
>> >> that timestamp, and then check to see if the resulting value is now
>> >> large enough to trigger any of the windows for that key/tenant.
>> >>
>> >> Handling allowed lateness should be pretty straightforward.
>> >>
>> >> Hope this helps,
>> >> David
>> >>
>> >> [1]
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>> >> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
>> >
>> >>
>> >> On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com
>> >> <mailto:rionmons...@gmail.com>> wrote:
>> >>
>> >>     Hey folks, I have a somewhat high-level/advice question regarding
>> >>     Flink and if it has the mechanisms in place to accomplish what I’m
>> >>     trying to do. I’ve spent a good bit of time using Apache Beam, but
>> >>     recently pivoted over to native Flink simply because some of the
>> >>     connectors weren’t as mature or didn’t support some of the
>> >>     functionality that I needed.
>> >>
>> >>     Basically - I have a single Kafka topic with 10 partitions that
>> >>     I’m consuming from. This is a multi-tenant topic containing data
>> >>     that comes in at various times from various tenants and is not at
>> >>     all guaranteed to be in order, at least with regards to “event
>> >>     time”, which is what I care about.
>> >>
>> >>     What I’m trying to accomplish is this: *Given a multi-tenant topic
>> >>     with records eventually distributed across partitions, is it
>> >>     possible to consume and window each of these records independently
>> >>     of one another without one tenant potentially influencing another
>> >>     and write out to separate files per tenant/source (i.e. some other
>> >>     defined property on the records)?”
>> >>     *
>> >>     My pipeline currently looks something like this:
>> >>
>> >>     @JvmStatic
>> >>     fun main(args: Array<String>) {
>> >>         val pipeline = StreamExecutionEnvironment
>> >>             .getExecutionEnvironment()
>> >>             //.createLocalEnvironmentWithWebUI(Configuration())
>> >>
>> >>         val properties = buildPropertiesFromArgs(args)
>> >>         val stream = pipeline
>> >>             .addSource(readFromKafka("events", properties))
>> >>             .assignTimestampsAndWatermarks(
>> >>                 WatermarkStrategy
>> >>
>> >>     .forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...))
>> >>                     .withTimestampAssigner { event: Event, _: Long ->
>> >>                         // Assign the created timestamp as the event
>> >>     timestamp
>> >>                         Instant(event.createdTimestamp).millis
>> >>                     }
>> >>             )
>> >>
>> >>         // There are multiple data sources that each have their own
>> >>     windows and allowed lateness
>> >>         // so ensure that each source only handles records for it
>> >>         DataSources.forEach { source ->
>> >>             stream
>> >>                 .filter { event ->
>> >>                     event.source == source.name <http://source.name>
>> >>                 }
>> >>                 .keyBy { event ->
>> >>                     //print("Keying record with id ${record.`id$1`} by
>> >>     tenant ${record.`source$1`.tenantName}")
>> >>                     event.tenant
>> >>                 }
>> >>                 .window(
>> >>
>> >>     TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
>> >>                 )
>> >>                 .allowedLateness(
>> >>                     Time.minutes(source.allowedLateness)
>> >>                 )
>> >>                 .process(
>> >>                     // This just contains some logic to take the
>> >>     existing windows and construct a file
>> >>                     // using the window range and keys (tenant/source)
>> >>     with the values being
>> >>                     // an aggregation of all of the records
>> >>                     WindowedEventProcessFunction(source.name
>> >>     <http://source.name>)
>> >>                 )
>> >>                 .map { summary ->
>> >>                     // This would be a sink to write to a file
>> >>                 }
>> >>         }
>> >>         pipeline.execute("event-processor")
>> >>     }
>> >>
>> >>     My overarching question is really - *Can I properly separate the
>> >>     data with custom watermark strategies and ensure that keying (or
>> >>     some other construct) is enough to allow each tenant/source
>> >>     combination to be treated as it’s own stream with it’s own
>> >>     watermarking? *I know I could possibly break the single topic up
>> >>     into multiple disparate topics, however that level of granularity
>> >>     would likely result in several thousand (7000+) topics so I'm
>> >>     hoping that some of the constructs available within Flink may help
>> >>     with this (WatermarkStrategies, etc.)
>> >>
>> >>     Any recommendations / advice would be extremely helpful as I'm
>> >>     quite new to the Flink world, however I have quite a bit of
>> >>     experience in Apache Beam, Kafka Streams, and a smattering of
>> >>     other streaming technologies.
>> >>
>> >>     Thanks much,
>> >>
>> >>     Rion
>> >>
>>
>>

Reply via email to