Hi Kant,

if you only want to output every second, you probably want to use a
ProcessFunction with timers [1].

Basically, this function holds the states and manages the updates to it.
The updates should also be stored in a local/non-state variable *changes*.
Whenever the timer triggers, you would output *changes *(possibly to a side
output) and reset it.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers

On Fri, Mar 6, 2020 at 4:39 PM Robert Metzger <rmetz...@apache.org> wrote:

> Hey,
>
> I don't think you need to use a window operator for this use case. A
> reduce (or fold) operation should be enough:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
>
>
> On Fri, Mar 6, 2020 at 11:50 AM kant kodali <kanth...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for this. so how can I emulate an infinite window while outputting
>> every second? simply put, I want to store the state forever (say years) and
>> since rocksdb is my state backend I am assuming I can state the state until
>> I run out of disk. However I want to see all the updates to the states
>> every second. sounds to me I need to have a window of one second, compute
>> for that window and pass it on to next window or is there some other way?
>>
>> Thanks
>>
>> On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu <qcx978132...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> From the description, you use window operator, and set to event time.
>>> then you should call `DataStream.assignTimestampsAndWatermarks` to set
>>> the timestamp and watermark.
>>> Window is triggered when the watermark exceed the window end time
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> kant kodali <kanth...@gmail.com> 于2020年3月4日周三 上午5:11写道:
>>>
>>>> Hi All,
>>>>
>>>> I have a custom aggregated state that is represent by Set<Long> and I
>>>> have a stream of values coming in from Kafka where I inspect, compute the
>>>> custom aggregation and store it in Set<Long>. Now, I am trying to figureout
>>>> how do I print the updated value everytime this state is updated?
>>>>
>>>> Imagine I have a Datastream<Set<Long>>
>>>>
>>>> I tried few things already but keep running into the following
>>>> exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I
>>>> thought watermarks are not mandatory in Flink especially when I want to
>>>> keep this aggregated state forever. any simple code sample on how to print
>>>> the streaming aggregated state represented by Datastream<Set<Long>> will be
>>>> great! You can imagine my Set<Long> has a toString() method that takes
>>>> cares of printing..and I just want to see those values in stdout.
>>>>
>>>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
>>>> timestamp (= no timestamp marker). Is the time characteristic set to
>>>> 'ProcessingTime', or did you forget to call
>>>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>>>
>>>

Reply via email to