Hi Fanbin,

Fabian is right, it should be a watermark problem. Probably, some tasks of
the source don't have enough data to advance the watermark. Furthermore,
you could also monitor event time through Flink web interface.
I have answered a similar question on stackoverflow, see more details
here[1].

Best, Hequn

[1]
https://stackoverflow.com/questions/51691269/event-time-window-in-flink-does-not-trigger

On Wed, Jul 24, 2019 at 4:38 AM Fanbin Bu <fanbin...@coinbase.com> wrote:

> If I use proctime, the groupBy happens without any delay.
>
> On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>
>> not sure whether this is related:
>>
>> public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
>>       AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
>>
>>    // match parallelism to input, otherwise dop=1 sources could lead to some 
>> strange
>>    // behaviour: the watermark will creep along very slowly because the 
>> elements
>>    // from the source go to each extraction operator round robin.
>>    final int inputParallelism = getTransformation().getParallelism();
>>    final AssignerWithPeriodicWatermarks<T> cleanedAssigner = 
>> clean(timestampAndWatermarkAssigner);
>>
>>    TimestampsAndPeriodicWatermarksOperator<T> operator =
>>          new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
>>
>>    return transform("Timestamps/Watermarks", 
>> getTransformation().getOutputType(), operator)
>>          .setParallelism(inputParallelism);
>> }
>>
>> parallelism is set to 32
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> env.setParallelism(32)
>>
>> and the command for launching the job is
>>
>> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
>>
>>
>>
>>
>> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>>
>>> Thanks Fabian for the prompt reply. I just started using Flink and this
>>> is a great community.
>>> The watermark setting is only accounting for 10 sec delay. Besides that,
>>> the local job on IntelliJ is running fine without issues.
>>>
>>> Here is the code:
>>>
>>> class EventTimestampExtractor(slack: Long = 0L) extends 
>>> AssignerWithPeriodicWatermarks[T] {
>>>
>>>   var currentMaxTimestamp: Long = _
>>>
>>>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
>>>     val elemTs = e.created_at
>>>     currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
>>>     elemTs
>>>   }
>>>
>>>   override def getCurrentWatermark(): Watermark = {
>>>       new Watermark(currentMaxTimestamp)
>>>   }
>>> }
>>>
>>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
>>>
>>> Are there any other things I should be aware of?
>>>
>>> Thanks again for you kind help!
>>>
>>> Fanbin
>>>
>>>
>>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Fanbin,
>>>>
>>>> The delay is most likely caused by the watermark delay.
>>>> A window is computed when the watermark passes the end of the window.
>>>> If you configured the watermark to be 10 minutes before the current max
>>>> timestamp (probably to account for out of order data), then the window will
>>>> be computed with approx. 10 minute delay.
>>>>
>>>> Best, Fabian
>>>>
>>>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
>>>> fanbin...@coinbase.com>:
>>>>
>>>>> Hi,
>>>>> I have a Flink sql streaming job defined by:
>>>>>
>>>>> SELECT
>>>>>   user_id
>>>>>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
>>>>> bucket_ts
>>>>>   , count(name) as count
>>>>> FROM event
>>>>> WHERE name = 'signin'
>>>>> GROUP BY
>>>>>   user_id
>>>>>   , hop(created_at, interval '30' second, interval '1' minute)
>>>>>
>>>>>
>>>>> there is a noticeably delay of the groupBy operator. For example, I
>>>>> only see the record sent out 10 min later after the record received in. 
>>>>> see
>>>>> the attached pic.
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> I m expecting to see the group by result after one minute since the
>>>>> sliding window size is 1 min and the slide is 30 sec.
>>>>>
>>>>> There is no such issue if I run the job locally in IntelliJ. However,
>>>>> I ran into the above issue if I run the job on EMR (flink version = 1.7)
>>>>>
>>>>> Can anybody give a clue of what could be wrong?
>>>>> Thanks,
>>>>>
>>>>> Fanbin
>>>>>
>>>>

Reply via email to