Hi All,

I have a custom aggregated state that is represent by Set<Long> and I have
a stream of values coming in from Kafka where I inspect, compute the custom
aggregation and store it in Set<Long>. Now, I am trying to figureout how do
I print the updated value everytime this state is updated?

Imagine I have a Datastream<Set<Long>>

I tried few things already but keep running into the following exception.
Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
watermarks are not mandatory in Flink especially when I want to keep this
aggregated state forever. any simple code sample on how to print the
streaming aggregated state represented by Datastream<Set<Long>> will be
great! You can imagine my Set<Long> has a toString() method that takes
cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
(= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?

Reply via email to