Hello,
I'm trying to find a way to read a stream of events from Kafka, key
them with a proper key function and then grouping records in windows
of a given size with the Count trigger or to buffer the window for a
couple seconds. Since the system could receive multiple GBs of data in
a very short period of time, being able to create windows of a certain
size is essential to avoid overwhelming the operators.
Since further down the pipeline I'm using the ForST backend and the V2
State APIs I cannot find a way to use the AsyncCountTrigger, as the
class expected in the `window().trigger()` call requires a subclass of
`Trigger`
A simple example using scala is as follows

// kafkaSource is just a datastream with a wrapper over raw kafka records

kafkaSource.keyBy(new KeySelector[KafkaRecord,String] {
    override def getKey(in: KafkaRecord): String = new String(in.key)
  }).enableAsyncState()
  .window(GlobalWindows.create())
  .trigger(AsyncProcessingTimeTrigger.create())
  .process(new WindowMatcherWithStateV2())

If I use the ProcessingTimeoutTrigger with the regular CountTrigger, I
get the following error:
Trigger is for state V1 APIs, window operator with async state enabled
only accept state V2 APIs.
Any hints on how to achieve a similar behavior like the one obtained
combining ProcessingTimeTrigger and Count trigger? I don't mind
writing my own trigger, but it seems I cannot pass an async trigger to
the current APIs exposed in WindowedStream.

Thanks in advance and let me know if you need more details,

Regards,

Pablo Flores.

Reply via email to