If I use proctime, the groupBy happens without any delay.

On Tue, Jul 23, 2019 at 10:16 AM Fanbin Bu <fanbin...@coinbase.com> wrote:

> not sure whether this is related:
>
> public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
>       AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
>
>    // match parallelism to input, otherwise dop=1 sources could lead to some 
> strange
>    // behaviour: the watermark will creep along very slowly because the 
> elements
>    // from the source go to each extraction operator round robin.
>    final int inputParallelism = getTransformation().getParallelism();
>    final AssignerWithPeriodicWatermarks<T> cleanedAssigner = 
> clean(timestampAndWatermarkAssigner);
>
>    TimestampsAndPeriodicWatermarksOperator<T> operator =
>          new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
>
>    return transform("Timestamps/Watermarks", 
> getTransformation().getOutputType(), operator)
>          .setParallelism(inputParallelism);
> }
>
> parallelism is set to 32
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setParallelism(32)
>
> and the command for launching the job is
>
> flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS
>
>
>
>
> On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>
>> Thanks Fabian for the prompt reply. I just started using Flink and this
>> is a great community.
>> The watermark setting is only accounting for 10 sec delay. Besides that,
>> the local job on IntelliJ is running fine without issues.
>>
>> Here is the code:
>>
>> class EventTimestampExtractor(slack: Long = 0L) extends 
>> AssignerWithPeriodicWatermarks[T] {
>>
>>   var currentMaxTimestamp: Long = _
>>
>>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
>>     val elemTs = e.created_at
>>     currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
>>     elemTs
>>   }
>>
>>   override def getCurrentWatermark(): Watermark = {
>>       new Watermark(currentMaxTimestamp)
>>   }
>> }
>>
>> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
>>
>> Are there any other things I should be aware of?
>>
>> Thanks again for you kind help!
>>
>> Fanbin
>>
>>
>> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Fanbin,
>>>
>>> The delay is most likely caused by the watermark delay.
>>> A window is computed when the watermark passes the end of the window. If
>>> you configured the watermark to be 10 minutes before the current max
>>> timestamp (probably to account for out of order data), then the window will
>>> be computed with approx. 10 minute delay.
>>>
>>> Best, Fabian
>>>
>>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
>>> fanbin...@coinbase.com>:
>>>
>>>> Hi,
>>>> I have a Flink sql streaming job defined by:
>>>>
>>>> SELECT
>>>>   user_id
>>>>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
>>>> bucket_ts
>>>>   , count(name) as count
>>>> FROM event
>>>> WHERE name = 'signin'
>>>> GROUP BY
>>>>   user_id
>>>>   , hop(created_at, interval '30' second, interval '1' minute)
>>>>
>>>>
>>>> there is a noticeably delay of the groupBy operator. For example, I
>>>> only see the record sent out 10 min later after the record received in. see
>>>> the attached pic.
>>>>
>>>> [image: image.png]
>>>>
>>>> I m expecting to see the group by result after one minute since the
>>>> sliding window size is 1 min and the slide is 30 sec.
>>>>
>>>> There is no such issue if I run the job locally in IntelliJ. However, I
>>>> ran into the above issue if I run the job on EMR (flink version = 1.7)
>>>>
>>>> Can anybody give a clue of what could be wrong?
>>>> Thanks,
>>>>
>>>> Fanbin
>>>>
>>>

Reply via email to