Thanks Fabian for the prompt reply. I just started using Flink and this is
a great community.
The watermark setting is only accounting for 10 sec delay. Besides that,
the local job on IntelliJ is running fine without issues.

Here is the code:

class EventTimestampExtractor(slack: Long = 0L) extends
AssignerWithPeriodicWatermarks[T] {

  var currentMaxTimestamp: Long = _

  override def extractTimestamp(e: T, prevElementTimestamp: Long) = {
    val elemTs = e.created_at
    currentMaxTimestamp = Math.max(elemTs - slack, currentMaxTimestamp)
    elemTs
  }

  override def getCurrentWatermark(): Watermark = {
      new Watermark(currentMaxTimestamp)
  }
}

events.assignTimestampsAndWatermarks(new EventTimestampExtractor(10000))

Are there any other things I should be aware of?

Thanks again for you kind help!

Fanbin


On Tue, Jul 23, 2019 at 2:49 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Fanbin,
>
> The delay is most likely caused by the watermark delay.
> A window is computed when the watermark passes the end of the window. If
> you configured the watermark to be 10 minutes before the current max
> timestamp (probably to account for out of order data), then the window will
> be computed with approx. 10 minute delay.
>
> Best, Fabian
>
> Am Di., 23. Juli 2019 um 02:00 Uhr schrieb Fanbin Bu <
> fanbin...@coinbase.com>:
>
>> Hi,
>> I have a Flink sql streaming job defined by:
>>
>> SELECT
>>   user_id
>>   , hop_end(created_at, interval '30' second, interval '1' minute) as 
>> bucket_ts
>>   , count(name) as count
>> FROM event
>> WHERE name = 'signin'
>> GROUP BY
>>   user_id
>>   , hop(created_at, interval '30' second, interval '1' minute)
>>
>>
>> there is a noticeably delay of the groupBy operator. For example, I only
>> see the record sent out 10 min later after the record received in. see the
>> attached pic.
>>
>> [image: image.png]
>>
>> I m expecting to see the group by result after one minute since the
>> sliding window size is 1 min and the slide is 30 sec.
>>
>> There is no such issue if I run the job locally in IntelliJ. However, I
>> ran into the above issue if I run the job on EMR (flink version = 1.7)
>>
>> Can anybody give a clue of what could be wrong?
>> Thanks,
>>
>> Fanbin
>>
>

Reply via email to