Hi David,
Thanks for your prompt reply, it was very helpful and the PseudoWindow
example is excellent. I believe it closely aligns with an approach that
I was tinkering with but seemed to be missing a few key pieces. In my
case, I'm essentially going to want to be aggregating the messages that
are coming into the window (a simple string-concatenation aggregation
would work). Would I need another form of state to hold that, as looking
through this example with naive eyes, it seems that this function is
currently storing multiple windows in state via the MapState provided:
// Keyed, managed state, with an entry for each window, keyed by the
window's end time.
// There is a separate MapState object for each driver.
private transient MapState<Long, Float> sumOfTips;
If I wanted to perform an aggregation for each key/tenant, would a
MapState be appropriate? Such as a MapState<Long, String> if I was doing
a string aggregation, so that within my processElement function I could
use something similar for building these aggregations and ultimately
triggering them:
// Keep track of a tenant/source specific watermark
private lateinit var currentWatermark: ValueState<Long>
// Keep track of the contents of each of the windows where the key
represents the close
// of the window and the contents represents an accumulation of the
records for that window
private lateinit var windowContents: MapState<Long, String>
If that's the case, this is what I've thrown together thus far and I
feel like it's moving in the right direction:
class MagicWindow(private val duration: Long, private val lateness: Long):
KeyedProcessFunction<String, Event, FileOutput>(){
// Keep track of a tenant/source specific watermark
private lateinit var currentWatermark: ValueState<Long>
// Keep track of the contents of each of the windows where the key
represents the close
// of the window and the contents represents an accumulation of the
records for that window
private lateinit var windowContents: MapState<Long, String>
override fun open(config: Configuration) {
currentWatermark = runtimeContext.getState(watermark)
currentWatermark.update(Long.MIN_VALUE)
}
override fun processElement(element: Event, context: Context, out:
Collector<FileOutput>) {
// Resolve the event time
val eventTime: Long = getEventTime(element)
// Update watermark (if applicable)
if (currentWatermark.value() < eventTime){
currentWatermark.update(eventTime)
}
// Define a timer for this window
val timerService = context.timerService()
if (eventTime <= timerService.currentWatermark()) {
// This event is late; its window has already been triggered.
} else {
// Determine the "actual" window closure and start a timer
for it
// (eventTime + window
val endOfWindow= eventTime - (eventTime % duration) +
duration - 1
// Schedule a callback for when the window has been completed.
timerService.registerEventTimeTimer(endOfWindow)
// Add this element to the corresponding aggregation for
this window
windowContents.put(endOfWindow, windowContents[endOfWindow]
+ "$element")
}
}
override fun onTimer(timestamp: Long, context: OnTimerContext, out:
Collector<FileOutput>) {
val key = context.currentKey
val currentAggregation: String = windowContents.get(timestamp)
// Output things here and clear the current aggregation for this
// tenant/source combination in this window
}
companion object {
private val watermark = ValueStateDescriptor(
"watermark",
Long::class.java
)
private val windowContents = MapStateDescriptor(
"window-contents",
Long::class.java,
String::class.java
)
fun getEventTime(element: Event): Long {
return Instant(element.`source$1`.createdTimestamp).millis
}
}
}
Is something glaringly off with this? I’ll need to do some additionally
reading on the timers, but any additional clarification would be greatly
appreciated.
Thanks so much for your initial response again!
Rion
On Feb 25, 2021, at 3:27 PM, David Anderson <dander...@apache.org> wrote:
Rion,
What you want isn't really achievable with the APIs you are using.
Without some sort of per-key (per-tenant) watermarking -- which Flink
doesn't offer -- the watermarks and windows for one tenant can be held
up by the failure of another tenant's events to arrive in a timely manner.
However, your pipeline is pretty straightforward, and it shouldn't be
particularly difficult to accomplish what you want. What you can do is
to ignore the built-in watermarking and windowing APIs, and build
equivalent functionality in the form of a KeyedProcessFunction.
The Flink docs include an example [1] showing how to implement your
own tumbling event time windows with a process function. That
implementation assumes you can rely on watermarks for triggering the
windows; you'll have to do that differently.
What you can do instead is to track, in ValueState, the largest
timestamp you've seen so far (for each key/tenant). Whenever that
advances, you can subtract the bounded-out-of-orderness duration from
that timestamp, and then check to see if the resulting value is now
large enough to trigger any of the windows for that key/tenant.
Handling allowed lateness should be pretty straightforward.
Hope this helps,
David
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example>
On Thu, Feb 25, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com
<mailto:rionmons...@gmail.com>> wrote:
Hey folks, I have a somewhat high-level/advice question regarding
Flink and if it has the mechanisms in place to accomplish what I’m
trying to do. I’ve spent a good bit of time using Apache Beam, but
recently pivoted over to native Flink simply because some of the
connectors weren’t as mature or didn’t support some of the
functionality that I needed.
Basically - I have a single Kafka topic with 10 partitions that
I’m consuming from. This is a multi-tenant topic containing data
that comes in at various times from various tenants and is not at
all guaranteed to be in order, at least with regards to “event
time”, which is what I care about.
What I’m trying to accomplish is this: *Given a multi-tenant topic
with records eventually distributed across partitions, is it
possible to consume and window each of these records independently
of one another without one tenant potentially influencing another
and write out to separate files per tenant/source (i.e. some other
defined property on the records)?”
*
My pipeline currently looks something like this:
@JvmStatic
fun main(args: Array<String>) {
val pipeline = StreamExecutionEnvironment
.getExecutionEnvironment()
//.createLocalEnvironmentWithWebUI(Configuration())
val properties = buildPropertiesFromArgs(args)
val stream = pipeline
.addSource(readFromKafka("events", properties))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness<Event>(Duration.ofSeconds(...))
.withTimestampAssigner { event: Event, _: Long ->
// Assign the created timestamp as the event
timestamp
Instant(event.createdTimestamp).millis
}
)
// There are multiple data sources that each have their own
windows and allowed lateness
// so ensure that each source only handles records for it
DataSources.forEach { source ->
stream
.filter { event ->
event.source == source.name <http://source.name>
}
.keyBy { event ->
//print("Keying record with id ${record.`id$1`} by
tenant ${record.`source$1`.tenantName}")
event.tenant
}
.window(
TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
)
.allowedLateness(
Time.minutes(source.allowedLateness)
)
.process(
// This just contains some logic to take the
existing windows and construct a file
// using the window range and keys (tenant/source)
with the values being
// an aggregation of all of the records
WindowedEventProcessFunction(source.name
<http://source.name>)
)
.map { summary ->
// This would be a sink to write to a file
}
}
pipeline.execute("event-processor")
}
My overarching question is really - *Can I properly separate the
data with custom watermark strategies and ensure that keying (or
some other construct) is enough to allow each tenant/source
combination to be treated as it’s own stream with it’s own
watermarking? *I know I could possibly break the single topic up
into multiple disparate topics, however that level of granularity
would likely result in several thousand (7000+) topics so I'm
hoping that some of the constructs available within Flink may help
with this (WatermarkStrategies, etc.)
Any recommendations / advice would be extremely helpful as I'm
quite new to the Flink world, however I have quite a bit of
experience in Apache Beam, Kafka Streams, and a smattering of
other streaming technologies.
Thanks much,
Rion