I'm trying to use *FlinkKafkaConsumer* and a custom Trigger like explained
here:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/operators/windows/#fire-and-purge
This my *window assigner* implementation:
class TumblingEventWindowAssigner(WindowAssigner[str, TimeWindow]):
def __init__(self, size: int, offset: int, is_event_time: bool):
self._size = size
self._offset = offset
self._is_event_time = is_event_time
def assign_windows(
self,
element: str,
timestamp: int,
context: WindowAssigner.WindowAssignerContext,
) -> Collection[TimeWindow]:
start = TimeWindow.get_window_start_with_offset(timestamp,
self._offset, self._size)
return [TimeWindow(start, start + self._size)]
def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
return EventTimeTrigger()
def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
return TimeWindowSerializer()
def is_event_time(self) -> bool:
return self._is_event_time
And this is my *trigger* implementation:
class EventTimeTrigger(Trigger[str, TimeWindow]):
def on_element(
self,
element: str,
timestamp: int,
window: TimeWindow,
ctx: Trigger.TriggerContext,
) -> TriggerResult:
return TriggerResult.CONTINUE
def on_processing_time(
self,
timestamp: int,
window: TimeWindow,
ctx: Trigger.TriggerContext,
) -> TriggerResult:
return TriggerResult.CONTINUE
def on_event_time(
self,
timestamp: int,
window: TimeWindow,
ctx: Trigger.TriggerContext,
) -> TriggerResult:
if timestamp >= window.max_timestamp():
return TriggerResult.FIRE_AND_PURGE
else:
return TriggerResult.CONTINUE
def on_merge(
self,
window: TimeWindow,
ctx: Trigger.OnMergeContext,
) -> None:
pass
def clear(
self,
window: TimeWindow,
ctx: Trigger.TriggerContext,
) -> None:
pass
But the problem is, the Kafka consumer does not read any message unless I
use process time instead and change the *on_processing_time* implementation
to be the same as *on_event_time*.
I'm I doing anything wrong here? How can I use event time properly?
--
This e-mail and any attachments may contain information that is
privileged, confidential, and/or exempt from disclosure under applicable
law. If you are not the intended recipient, you are hereby notified that
any disclosure, copying, distribution or use of any information contained
herein is strictly prohibited. If you have received this transmission in
error, please immediately notify the sender and destroy the original
transmission and any attachments, whether in electronic or hard copy
format, without reading or saving.