not sure whether this is related:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
      AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

   // match parallelism to input, otherwise dop=1 sources could lead
to some strange
   // behaviour: the watermark will creep along very slowly because the elements
   // from the source go to each extraction operator round robin.
   final int inputParallelism = getTransformation().getParallelism();
   final AssignerWithPeriodicWatermarks<T> cleanedAssigner =
clean(timestampAndWatermarkAssigner);

   TimestampsAndPeriodicWatermarksOperator<T> operator =
         new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

   return transform("Timestamps/Watermarks",
getTransformation().getOutputType(), operator)
         .setParallelism(inputParallelism);
}

parallelism is set to 32

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(32)

and the command for launching the job is

flink run -m yarn-cluster -ys 8 -yn 4 -ytm 4096 -yjm 4096 $JAR $ARGS




On Tue, Jul 23, 2019 at 9:59 AM Fanbin Bu <fanbin...@coinbase.com> wrote:

> Thanks Fabian for the prompt reply. I just started using Flink and this is
> a great community.
> The watermark setting is only accounting for 10 sec delay. Besides that,
> the local job on IntelliJ is running fine without issues.
>
> Here is the code:
>
> class EventTimestampExtractor(slack: Long = 0L) extends 
> AssignerWithPeriodicWatermarks[T] {
>
>   var currentMaxTimestamp: Long = _
>
>   override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
>     val elemTs = e.created_at
>     currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
>     elemTs
>   }
>
>   override def getCurrentWatermark(): Watermark = {
>       new Watermark(currentMaxTimestamp)
>   }
> }
>
> events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))
>
> Are there any other things I should be aware of?
>
> Thanks again for you kind help!
>
> Fanbin
>
>
> On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Fanbin,
>>
>> The delay is most likely caused by the watermark delay.
>> A window is computed when the watermark passes the end of the window. If
>> you configured the watermark to be 10 minutes before the current max
>> timestamp (probably to account for out of order data), then the window will
>> be computed with approx. 10 minute delay.
>>
>> Best, Fabian
>>
>> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
>> fanbin...@coinbase.com>:
>>
>>> Hi,
>>> I have a Flink sql streaming job defined by:
>>>
>>> SELECT
>>>   user_id
>>>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
>>> bucket_ts
>>>   , count(name) as count
>>> FROM event
>>> WHERE name = 'signin'
>>> GROUP BY
>>>   user_id
>>>   , hop(created_at, interval '30' second, interval '1' minute)
>>>
>>>
>>> there is a noticeably delay of the groupBy operator. For example, I only
>>> see the record sent out 10 min later after the record received in. see the
>>> attached pic.
>>>
>>> [image: image.png]
>>>
>>> I m expecting to see the group by result after one minute since the
>>> sliding window size is 1 min and the slide is 30 sec.
>>>
>>> There is no such issue if I run the job locally in IntelliJ. However, I
>>> ran into the above issue if I run the job on EMR (flink version = 1.7)
>>>
>>> Can anybody give a clue of what could be wrong?
>>> Thanks,
>>>
>>> Fanbin
>>>
>>

Reply via email to