Re: Aggregation using event timestamp than clock window

2018-01-28 Thread Rohan Thimmappa
Hi Gary,


 Thanks.I do have some of the events coming in after one pauses and i
am able to see watermarked being advanced event being triggered.


Rohan

On Mon, Jan 15, 2018 at 5:40 AM, Gary Yao  wrote:

> Hi Rohan,
>
> In your example, are you saying that after 5:40 you will not receive any
> events
> at all which could advance the watermark?
>
> I am asking because if you are receiving events for other keys/ids from
> your
> KafkaSource after 5:40, the watermark will still be advanced and fire the
> tumbling window.
>
> Best,
> Gary
>
> On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa <
> rohan.thimma...@gmail.com> wrote:
>
>> No. My question is slightly different.
>>
>> say i get report from 5.10-5.40. the device went offline and never comes
>> back. i will not get any report after 5.40. So 5-6 window never gets closed
>> as we will not get any report after 5.40. in this case 5.00-5.40 data is
>> still in flink memory that will never get sent as we are closing the window
>> by seeing the next hour window. ie any report carrying 6.00 end date in it.
>>
>>
>> so what i would like to do is. Wait for say 1 or 2 hours if i don't get
>> message for the given id then i would like to close the window and send
>> this to destination system(in my case kafka topic.)
>>
>>
>>
>>
>> Rohan
>>
>> On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao  wrote:
>>
>>> Hi Rohan,
>>>
>>> I am not sure if I fully understand your problem. For example, if you
>>> receive an
>>> event with a start time of 4:50 and an end time of 5:30, do you want the
>>> "usage"
>>> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
>>> event had
>>> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
>>> 5:00
>>> window?
>>>
>>> Best,
>>>
>>> Gary
>>>
>>> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
>>> rohan.thimma...@gmail.com> wrote:
>>>
 Hi Gary,

 This is perfect. I am able to get the window working on message
 timestamp then clock window also stream the data that are late.

 I also having one edge case.


 for eg i get my last report at 4.57 and i never get 5.00+ hour report
 *ever*. i would like to wait for sometime. My reporting interval size
 is 30 min. if in next 30 min if i don't see any record then i would like to
 construct 4-5 by closing the window and dispatch the report. Intention is i
 don't want to loss the last hour of the data since the stream end in
 between the hour.

 Rohan

 On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao 
 wrote:

> Hi Rohan,
>
> Your ReportTimestampExtractor assigns timestamps to the stream records
> correctly
> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
> In Flink
> Watermarks are the mechanism to advance the event time. Hence, you
> should emit
> Watermarks according to the time that you extract from your events.
>
> You can take a look at the already existing timestamp extractors /
> watermark
> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
> how it can
> be done.
>
> Best,
> Gary
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/event_timestamp_extractors.html
>
> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
> rohan.thimma...@gmail.com> wrote:
>
>> Hi All,
>>
>>
>> I have following requirement
>>
>> 1. i have avro json message containing {eventid, usage, starttime,
>> endtime}
>> 2. i am reading this from kafka source
>>
>> 3. if there is overlapping hour in a record split the record by
>> rounding off to hourly bounderies
>> 4.My objective is a) read the message b) aggregate the usage between
>> the hour
>> 5. send the aggregated data to another kafka topic.
>>
>> i don't want aggregate based on clock window. if i see next hour in
>> endtime then i would like to close the window and aggregated usage to be
>> send down to kafka sink topic.
>>
>>
>> eg:
>> input data
>> 4.55 - 5.00
>> 5.00 -5.25
>> 5.25- 5.55.
>> 5.55-625
>>
>> after split
>> 4.55- 5.00 - expect record to be going out with this
>> 5.00 -5.25
>> 5.25- 5.55.
>> 5.55-6.00 - expect record to be going out with this
>> 5.00-625
>>
>>
>>
>>
>> 1. i have set the eventime : 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, 
>> Report]] = stream
>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes 
>> then create split recordr with hourly boundarry
>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>   .keyBy(0)
>>   
>> 

End-to-end exactly once from kafka source to S3 sink

2018-01-28 Thread chris snow
I’m working with a kafka environment where I’m limited to 100 partitions @
1GB log.retention.bytes each.  I’m looking to implement exactly once
processing from this kafka source to a S3 sink.

If I have understood correctly, Flink will only commit the kafka offsets
when the data has been saved to S3.

Have I understood correctly that for Flink checkpoints and exactly once to
work, the assumption is that the number and size of partitions
(log.retention.bytes) in kafka are sufficient that should a checkpoint need
to be rolled back, the data still exists in kafka (I.e. it hasn’t been
over-written by new data)?

If the above is true, and I am using a DateTimeBucketer the bucket sizes
will directly influence how big the partitions should be in kafka, because
larger buckets will result in less frequent commits of the offsets?

Many thanks,

Chris


RE: S3 for state backend in Flink 1.4.0

2018-01-28 Thread Marchant, Hayden
I see that we can still use the other implementation, but were hoping that we'd 
benefit from the bug fix done in Flink 1.4.0 around 'repeated' load of 
configuration.  I'll check with the old implementation and see if it still 
works.

We also have seen discussions on a more native protocol that interfaces 
directly to IBM Object Storage that can be configured through the hdfs-site.xml 
called stocator that might speed things up. 

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Thursday, January 25, 2018 6:30 PM
To: Marchant, Hayden [ICG-IT] 
Cc: user@flink.apache.org
Subject: Re: S3 for state backend in Flink 1.4.0

Hi,

Did you try overriding that config and it didn't work? That dependency is in 
fact still using the Hadoop S3 FS implementation but is shading everything to 
our own namespace so that there can't be any version conflicts. If that doesn't 
work then we need to look into this further.

The way you usually use this is by putting the flink-s3-fs-hadoop jar from the 
opt/ folder to the lib/ folder. I'm not sure including it as a dependency will 
work but it might. You also don't have to use flink-s3-fs-hadoop dependency if 
using the regular Hadoop S3 support worked for you before. It's only an 
additional option.

Best,
Aljoscha

> On 24. Jan 2018, at 16:33, Marchant, Hayden  wrote:
> 
> Hi,
> 
> We have a Flink Streaming application that uses S3 for storing checkpoints. 
> We are not using 'regular' S3, but rather IBM Object Storage which has an 
> S3-compatible connector. We had quite some challenges in overiding the 
> endpoint from the default s3.amnazonaws.com to our internal IBM Object 
> Storage endpoint. In 1.3.2, we managed to get this working by providing our 
> own jets3t.properties file that overrode s3service.s3-endpoint 
> (https://urldefense.proofpoint.com/v2/url?u=https-3A__jets3t.s3.amazonaws.com_toolkit_configuration.html=DwIFAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=pGMzFMafCab1RjHp3FDDKhlafEqeVPGytcX4PMbDk5Y=K2NJPrY_Mdv0u0B2CIvuckgr26dlraUJwZEU6aq5yXM=)
> 
> When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop 
> artifact. Seems that our overriding with jets3t.properties is no longer 
> relevant since does not use the Hadoop implementation anymore. 
> 
> Is there a way to overide this default endpoint, or with the presto endpoint 
> can we use this? Please note that if we provide the endpoint in the URL for 
> the state backend, it simply appends s3.amazonaws.com to the url. For example 
> s3://myobjectstorageendpoint.s3.amazonaws.com.
> 
> Are there any other solutions such as to 'rollback' to the Hadoop 
> implementation of S3?
> 
> Thanks,
> Hayden