In a nutshell the Over operator works as follows:
- When a row arrives it is put into a MapState keyed on its timestamp and a
timer is registered to process it when the watermark passes that timestamp.
- All the heavy computation is done in the onTimer() method. For each
unique timestamp, the Over operator iterates once over all records in the
MapState to retract and purge expired rows from and accumulate the new rows
to the aggregation result.

In Gregory's use case, many rows are added before the watermark advances.
Once that happens, the operator becomes very busy and iterates many times
over the state to retract and accumulate rows. During that time, the input
stream cannot be consumed, hence checkpoints stall.

The solution that seems to work is to increase the watermark interval.
However, we could also think about improving the implementation to reduce
the number of state iterations. A time-sorted state primitive would make
that much easier.

Best, Fabian

2018-06-28 6:41 GMT+02:00 Hequn Cheng <chenghe...@gmail.com>:

> Hi Gregory,
>
> What's the cause of your problem. It would be great if you can share your
> experience which I think will definitely help others.
>
>
> On Thu, Jun 28, 2018 at 11:30 AM, Gregory Fee <g...@lyft.com> wrote:
>
>> Yep, it was definitely a watermarking issue. I have that sorted out now.
>> Thanks!
>>
>> On Wed, Jun 27, 2018 at 6:49 PM, Hequn Cheng <chenghe...@gmail.com>
>> wrote:
>>
>>> Hi Gregory,
>>>
>>> As you are using the rowtime over window. It is probably a watermark
>>> problem. The window only output when watermarks make a progress. You
>>> can use processing-time(instead of row-time) to verify the assumption.
>>> Also, make sure there are data in each of you source partition, the
>>> watermarks make no progress if one of the source partition has no data. An
>>> operator’s current event time is the minimum of its input streams’ event
>>> times[1].
>>>
>>> Best, Hequn
>>>
>>> [1]: https://ci.apache.org/projects/flink/flink-docs-master/
>>> dev/event_time.html
>>>
>>> On Thu, Jun 28, 2018 at 1:58 AM, Gregory Fee <g...@lyft.com> wrote:
>>>
>>>> Thanks for your answers! Yes, it was based on watermarks.
>>>>
>>>> Fabian, the state does indeed grow quite a bit in my scenario. I've
>>>> observed in the range of 5GB. That doesn't seem to be an issue in itself.
>>>> However, in my scenario I'm loading a lot of data from a historic store
>>>> that is only partitioned by day. As such a full day's worth of data is
>>>> loaded into the system before the watermark advances. At that point the
>>>> checkpoints stall indefinitely with a couple of the tasks in the 'over'
>>>> operator never acknowledging. Any thoughts on what would cause that? Or how
>>>> to address it?
>>>>
>>>> On Wed, Jun 27, 2018 at 2:20 AM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> The OVER window operator can only emit result when the watermark is
>>>>> advanced, due to SQL semantics which define that all records with the same
>>>>> timestamp need to be processed together.
>>>>> Can you check if the watermarks make sufficient progress?
>>>>>
>>>>> Btw. did you observe state size or IO issues? The OVER window operator
>>>>> also needs to store the whole window interval in state, i.e., 14 days in
>>>>> your case, in order to be able to retract the data from the aggregates
>>>>> after 14 days.
>>>>> Everytime the watermark moves, the operator iterates over all
>>>>> timestamps (per key) to check which records need to be removed.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2018-06-27 5:38 GMT+02:00 Rong Rong <walter...@gmail.com>:
>>>>>
>>>>>> Hi Greg.
>>>>>>
>>>>>> Based on a quick test I cannot reproduce the issue, it is emitting
>>>>>> messages correctly in the ITCase environment.
>>>>>> can you share more information? Does the same problem happen if you
>>>>>> use proctime?
>>>>>> I am guessing this could be highly correlated with how you set your
>>>>>> watermark strategy of your input streams of "user_things" and 
>>>>>> "user_stuff".
>>>>>>
>>>>>> --
>>>>>> Rong
>>>>>>
>>>>>> On Tue, Jun 26, 2018 at 6:37 PM Gregory Fee <g...@lyft.com> wrote:
>>>>>>
>>>>>>> Hello User Community!
>>>>>>>
>>>>>>> I am running some streaming SQL that involves a union all into an
>>>>>>> over window similar to the below:
>>>>>>>
>>>>>>> SELECT user_id, count(action) OVER (PARTITION BY user_id ORDER BY
>>>>>>> rowtime RANGE INTERVAL '14' DAY PRECEDING) action_count, rowtime
>>>>>>> FROM
>>>>>>>     (SELECT rowtime, user_id, thing as action FROM user_things
>>>>>>>      UNION ALL SELECT rowtime, user_id, stuff as action FROM
>>>>>>> user_stuff)
>>>>>>>
>>>>>>> The SQL generates three operators. There are two operators that
>>>>>>> process the 'from' part of the clause that feed into an 'over' 
>>>>>>> operator. I
>>>>>>> notice that messages flow into the 'over' operator and just buffer there
>>>>>>> for a long time (hours in some cases). Eventually something happens and 
>>>>>>> the
>>>>>>> data starts to flush through to the downstream operators. Can anyone 
>>>>>>> help
>>>>>>> me understand what is causing that behavior? I want the data to flow
>>>>>>> through more consistently.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Gregory Fee*
>>>>>>> Engineer
>>>>>>> 425.830.4734 <+14258304734>
>>>>>>> [image: Lyft] <http://www.lyft.com>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> *Gregory Fee*
>>>> Engineer
>>>> 425.830.4734 <+14258304734>
>>>> [image: Lyft] <http://www.lyft.com>
>>>>
>>>
>>>
>>
>>
>> --
>> *Gregory Fee*
>> Engineer
>> 425.830.4734 <+14258304734>
>> [image: Lyft] <http://www.lyft.com>
>>
>
>

Reply via email to