Hi Gary,
Thanks.I do have some of the events coming in after one pauses and i
am able to see watermarked being advanced event being triggered.
Rohan
On Mon, Jan 15, 2018 at 5:40 AM, Gary Yao wrote:
> Hi Rohan,
>
> In your example, are you saying that after 5:40 you will not receive any
> events
> at all which could advance the watermark?
>
> I am asking because if you are receiving events for other keys/ids from
> your
> KafkaSource after 5:40, the watermark will still be advanced and fire the
> tumbling window.
>
> Best,
> Gary
>
> On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa <
> rohan.thimma...@gmail.com> wrote:
>
>> No. My question is slightly different.
>>
>> say i get report from 5.10-5.40. the device went offline and never comes
>> back. i will not get any report after 5.40. So 5-6 window never gets closed
>> as we will not get any report after 5.40. in this case 5.00-5.40 data is
>> still in flink memory that will never get sent as we are closing the window
>> by seeing the next hour window. ie any report carrying 6.00 end date in it.
>>
>>
>> so what i would like to do is. Wait for say 1 or 2 hours if i don't get
>> message for the given id then i would like to close the window and send
>> this to destination system(in my case kafka topic.)
>>
>>
>>
>>
>> Rohan
>>
>> On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao wrote:
>>
>>> Hi Rohan,
>>>
>>> I am not sure if I fully understand your problem. For example, if you
>>> receive an
>>> event with a start time of 4:50 and an end time of 5:30, do you want the
>>> "usage"
>>> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
>>> event had
>>> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
>>> 5:00
>>> window?
>>>
>>> Best,
>>>
>>> Gary
>>>
>>> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
>>> rohan.thimma...@gmail.com> wrote:
>>>
Hi Gary,
This is perfect. I am able to get the window working on message
timestamp then clock window also stream the data that are late.
I also having one edge case.
for eg i get my last report at 4.57 and i never get 5.00+ hour report
*ever*. i would like to wait for sometime. My reporting interval size
is 30 min. if in next 30 min if i don't see any record then i would like to
construct 4-5 by closing the window and dispatch the report. Intention is i
don't want to loss the last hour of the data since the stream end in
between the hour.
Rohan
On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao
wrote:
> Hi Rohan,
>
> Your ReportTimestampExtractor assigns timestamps to the stream records
> correctly
> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
> In Flink
> Watermarks are the mechanism to advance the event time. Hence, you
> should emit
> Watermarks according to the time that you extract from your events.
>
> You can take a look at the already existing timestamp extractors /
> watermark
> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
> how it can
> be done.
>
> Best,
> Gary
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/event_timestamp_extractors.html
>
> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
> rohan.thimma...@gmail.com> wrote:
>
>> Hi All,
>>
>>
>> I have following requirement
>>
>> 1. i have avro json message containing {eventid, usage, starttime,
>> endtime}
>> 2. i am reading this from kafka source
>>
>> 3. if there is overlapping hour in a record split the record by
>> rounding off to hourly bounderies
>> 4.My objective is a) read the message b) aggregate the usage between
>> the hour
>> 5. send the aggregated data to another kafka topic.
>>
>> i don't want aggregate based on clock window. if i see next hour in
>> endtime then i would like to close the window and aggregated usage to be
>> send down to kafka sink topic.
>>
>>
>> eg:
>> input data
>> 4.55 - 5.00
>> 5.00 -5.25
>> 5.25- 5.55.
>> 5.55-625
>>
>> after split
>> 4.55- 5.00 - expect record to be going out with this
>> 5.00 -5.25
>> 5.25- 5.55.
>> 5.55-6.00 - expect record to be going out with this
>> 5.00-625
>>
>>
>>
>>
>> 1. i have set the eventime :
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String,
>> Report]] = stream
>> .flatMap(new SplitFlatMap() // checks if the overlapping hour if yes
>> then create split recordr with hourly boundarry
>> .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>> .keyBy(0)
>>
>>