Hi Rohan,

In your example, are you saying that after 5:40 you will not receive any
events
at all which could advance the watermark?

I am asking because if you are receiving events for other keys/ids from your
KafkaSource after 5:40, the watermark will still be advanced and fire the
tumbling window.

Best,
Gary

On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa <rohan.thimma...@gmail.com>
wrote:

> No. My question is slightly different.
>
> say i get report from 5.10-5.40. the device went offline and never comes
> back. i will not get any report after 5.40. So 5-6 window never gets closed
> as we will not get any report after 5.40. in this case 5.00-5.40 data is
> still in flink memory that will never get sent as we are closing the window
> by seeing the next hour window. ie any report carrying 6.00 end date in it.
>
>
> so what i would like to do is. Wait for say 1 or 2 hours if i don't get
> message for the given id then i would like to close the window and send
> this to destination system(in my case kafka topic.)
>
>
>
>
> Rohan
>
> On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao <g...@data-artisans.com> wrote:
>
>> Hi Rohan,
>>
>> I am not sure if I fully understand your problem. For example, if you
>> receive an
>> event with a start time of 4:50 and an end time of 5:30, do you want the
>> "usage"
>> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
>> event had
>> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
>> 5:00
>> window?
>>
>> Best,
>>
>> Gary
>>
>> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
>> rohan.thimma...@gmail.com> wrote:
>>
>>> Hi Gary,
>>>
>>> This is perfect. I am able to get the window working on message
>>> timestamp then clock window also stream the data that are late.
>>>
>>> I also having one edge case.
>>>
>>>
>>> for eg i get my last report at 4.57 and i never get 5.00+ hour report
>>> *ever*. i would like to wait for sometime. My reporting interval size
>>> is 30 min. if in next 30 min if i don't see any record then i would like to
>>> construct 4-5 by closing the window and dispatch the report. Intention is i
>>> don't want to loss the last hour of the data since the stream end in
>>> between the hour.
>>>
>>> Rohan
>>>
>>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <g...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Rohan,
>>>>
>>>> Your ReportTimestampExtractor assigns timestamps to the stream records
>>>> correctly
>>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
>>>> In Flink
>>>> Watermarks are the mechanism to advance the event time. Hence, you
>>>> should emit
>>>> Watermarks according to the time that you extract from your events.
>>>>
>>>> You can take a look at the already existing timestamp extractors /
>>>> watermark
>>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>>>> how it can
>>>> be done.
>>>>
>>>> Best,
>>>> Gary
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>> dev/event_timestamp_extractors.html
>>>>
>>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>>>> rohan.thimma...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>>
>>>>> I have following requirement
>>>>>
>>>>> 1. i have avro json message containing {eventid, usage, starttime,
>>>>> endtime}
>>>>> 2. i am reading this from kafka source
>>>>>
>>>>> 3. if there is overlapping hour in a record split the record by
>>>>> rounding off to hourly bounderies
>>>>> 4.My objective is a) read the message b) aggregate the usage between
>>>>> the hour
>>>>> 5. send the aggregated data to another kafka topic.
>>>>>
>>>>> i don't want aggregate based on clock window. if i see next hour in
>>>>> endtime then i would like to close the window and aggregated usage to be
>>>>> send down to kafka sink topic.
>>>>>
>>>>>
>>>>> eg:
>>>>> input data
>>>>> 4.55 - 5.00
>>>>> 5.00 -5.25
>>>>> 5.25- 5.55.
>>>>> 5.55-625
>>>>>
>>>>> after split
>>>>> 4.55- 5.00 - expect record to be going out with this
>>>>> 5.00 -5.25
>>>>> 5.25- 5.55.
>>>>> 5.55-6.00 - expect record to be going out with this
>>>>> 5.00-625
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 1. i have set the eventime : 
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>
>>>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, 
>>>>> Report]] = stream
>>>>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes 
>>>>> then create split recordr with hourly boundarry
>>>>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>>>>   .keyBy(0)
>>>>>       
>>>>> .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>>>>
>>>>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>>>>
>>>>> 3. here is the implementation for timestampeextractor
>>>>>
>>>>> class ReportTimestampExtractor extends 
>>>>> AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with 
>>>>> Serializable {
>>>>>   override def extractTimestamp(e: Tuple2[String, Report], 
>>>>> prevElementTimestamp: Long) = {
>>>>>     e.f1.getEndTime
>>>>>   }
>>>>>
>>>>>   override def getCurrentWatermark(): Watermark = {
>>>>>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 
>>>>> hour
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> I see the aggregation is generated only the clock window rather than when 
>>>>> the window sees next hour in the record.
>>>>>
>>>>>
>>>>>
>>>>> Is there anything i am missing. by definition eventtime if i set it 
>>>>> should respect message time rather than clock window
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Rohan
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks
>>> Rohan
>>>
>>
>>
>
>
> --
> Thanks
> Rohan
>

Reply via email to