Hello,
I'm trying to find a way to read a stream of events from Kafka, key
them with a proper key function and then grouping records in windows
of a given size with the Count trigger or to buffer the window for a
couple seconds. Since the system could receive multiple GBs of data in
a very short period of time, being able to create windows of a certain
size is essential to avoid overwhelming the operators.
Since further down the pipeline I'm using the ForST backend and the V2
State APIs I cannot find a way to use the AsyncCountTrigger, as the
class expected in the `window().trigger()` call requires a subclass of
`Trigger`
A simple example using scala is as follows
// kafkaSource is just a datastream with a wrapper over raw kafka records
kafkaSource.keyBy(new KeySelector[KafkaRecord,String] {
override def getKey(in: KafkaRecord): String = new String(in.key)
}).enableAsyncState()
.window(GlobalWindows.create())
.trigger(AsyncProcessingTimeTrigger.create())
.process(new WindowMatcherWithStateV2())
If I use the ProcessingTimeoutTrigger with the regular CountTrigger, I
get the following error:
Trigger is for state V1 APIs, window operator with async state enabled
only accept state V2 APIs.
Any hints on how to achieve a similar behavior like the one obtained
combining ProcessingTimeTrigger and Count trigger? I don't mind
writing my own trigger, but it seems I cannot pass an async trigger to
the current APIs exposed in WindowedStream.
Thanks in advance and let me know if you need more details,
Regards,
Pablo Flores.