Re: Get an aggregator's value outside of an iteration

2015-11-30 Thread Stephan Ewen
We wanted to combine the accumulators and aggregators for a while, but have
not gotten to it so far (there is a pending PR which needs some more work).

You can currently work your way around this by using the accumulators
together with the aggregators.
  - Aggregators: Within an iteration across supersteps
  - Accumulators: Across one job, retrievable at the end in the client

You can use the close() method of a function (close() is called after each
superstep) to take the aggregator value and put it into an accumulator.
That accumulator can be retrieved after the job has finished.

Hope that workaround works for you!

Stephan


On Mon, Nov 30, 2015 at 10:51 AM, Aljoscha Krettek 
wrote:

> Hi,
> I’m afraid there is no way right now to get at the values of aggregators.
> I think implementing this is problematic since the aggregators from the
> different parallel instances of operators are only combined on the
> JobManager (master node).
>
> Cheers,
> Aljoscha
> > On 27 Nov 2015, at 23:04, Truong Duc Kien 
> wrote:
> >
> > Hi,
> >
> > I'm looking for a way get the value of aggregators outside of iteration.
> Specifically, I want the final aggregators' value after the iteration has
> finished. Is there any API for that ?
> >
> > Thanks,
> > Kien Truong
>
>


Re: Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input
stream and once this checkpoint reaches end of DAG take some action. So I
need a signal at the sink which can tell "all events in source before
checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't
propagate source events one-to-one. Some transformations might create 3
child events out of 1 source. If I want to make sure I fully processed
source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov 
wrote:

> Hi Fabian
>
> Defining a special flag for record seems like a checkpoint barrier. I
> think I will end up re-implementing checkpointing myself. I found the
> discussion in flink-dev:
> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
> 
>  which
> seems to solve my task. Essentially they want to have a mechanism which
> will mark record produced by job as “last” and then wait until it’s fully
> propagated through DAG. Similarly to what I need. Essentially my job which
> produces trades can also thought as being finished once it produced all
> trades, then I just need to wait till latest trade produced by this job is
> processed.
>
> So although windows can probably also be applied, I think propagating
> barrier through DAG and checkpointing at final job is what I need.
>
> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
> triggering a custom checkoint or finishing streaming job)?
>
> On 24 Nov 2015, at 21:53, Fabian Hueske  wrote:
>
> Hi Anton,
>
> If I got your requirements right, you are looking for a solution that
> continuously produces updated partial aggregates in a streaming fashion.
> When a  special event (no more trades) is received, you would like to store
> the last update as a final result. Is that correct?
>
> You can compute continuous updates using a reduce() or fold() function.
> These will produce a new update for each incoming event.
> For example:
>
> val s: DataStream[(Int, Long)] = ...
> s.keyBy(_._1)
>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>
> would continuously compute a sum for every key (_._1) and produce an
> update for each incoming record.
>
> You could add a flag to the record and implement a ReduceFunction that
> marks a record as final when the no-more-trades event is received.
> With a filter and a data sink you could emit such final records to a
> persistent data store.
>
> Btw.: You can also define custom trigger policies for windows. A custom
> trigger is called for each element that is added to a window and when
> certain timers expire. For example with a custom trigger, you can evaluate
> a window for every second element that is added. You can also define
> whether the elements in the window should be retained or removed after the
> evaluation.
>
> Best, Fabian
>
>
>
> 2015-11-24 21:32 GMT+01:00 Anton Polyakov :
>
>> Hi Max
>>
>> thanks for reply. From what I understand window works in a way that it
>> buffers records while window is open, then apply transformation once window
>> close is triggered and pass transformed result.
>> In my case then window will be open for few hours, then the whole amount
>> of trades will be processed once window close is triggered. Actually I want
>> to process events as they are produced without buffering them. It is more
>> like a stream with some special mark versus windowing seems more like a
>> batch (if I understand it correctly).
>>
>> In other words - buffering and waiting for window to close, then
>> processing will be equal to simply doing one-off processing when all events
>> are produced. I am looking for a solution when I am processing events as
>> they are produced and when source signals "done" my processing is also
>> nearly done.
>>
>>
>> On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels 
>> wrote:
>>
>>> Hi Anton,
>>>
>>> You should be able to model your problem using the Flink Streaming
>>> API. The actions you want to perform on the streamed records
>>> correspond to transformations on Windows. You can indeed use
>>> Watermarks to signal the window that a threshold for an action has
>>> been reached. Otherwise an eviction policy should also do it.
>>>
>>> Without more details about what you want to do I can only refer you to
>>> the streaming API documentation:
>>> Please see
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>>>
>>> Thanks,
>>> Max
>>>
>>> On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
>>>  wrote:
>>> > Hi
>>> >
>>> > I am very new to Flink and in fact never used it. My task (which I
>>> currently solve using home grown Redis-based 

Re: Watermarks as "process completion" flags

2015-11-30 Thread Stephan Ewen
Hi Anton!

That you can do!

You can look at the interfaces "Checkpointed" and "checkpointNotifier".
There you will get a call at every checkpoint (and can look at what records
are before that checkpoint). You also get a call once the checkpoint is
complete, which corresponds to the point when everything has flown through
the DAG.

I think it is nice to implement it like that, because it works
non-blocking: The stream continues while the the records-you-wait-for flow
through the DAG, and you get an asynchronous notification once they have
flown all the way through.

Greetings,
Stephan


On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov 
wrote:

> I think I can turn my problem into a simpler one.
>
> Effectively what I need - I need way to checkpoint certain events in input
> stream and once this checkpoint reaches end of DAG take some action. So I
> need a signal at the sink which can tell "all events in source before
> checkpointed event are now processed".
>
> As far as I understand flagged record don't quite work since DAG doesn't
> propagate source events one-to-one. Some transformations might create 3
> child events out of 1 source. If I want to make sure I fully processed
> source event, I need to wait till all childs are processed.
>
>
>
> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov 
> wrote:
>
>> Hi Fabian
>>
>> Defining a special flag for record seems like a checkpoint barrier. I
>> think I will end up re-implementing checkpointing myself. I found the
>> discussion in flink-dev:
>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>> 
>>  which
>> seems to solve my task. Essentially they want to have a mechanism which
>> will mark record produced by job as “last” and then wait until it’s fully
>> propagated through DAG. Similarly to what I need. Essentially my job which
>> produces trades can also thought as being finished once it produced all
>> trades, then I just need to wait till latest trade produced by this job is
>> processed.
>>
>> So although windows can probably also be applied, I think propagating
>> barrier through DAG and checkpointing at final job is what I need.
>>
>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
>> triggering a custom checkoint or finishing streaming job)?
>>
>> On 24 Nov 2015, at 21:53, Fabian Hueske  wrote:
>>
>> Hi Anton,
>>
>> If I got your requirements right, you are looking for a solution that
>> continuously produces updated partial aggregates in a streaming fashion.
>> When a  special event (no more trades) is received, you would like to store
>> the last update as a final result. Is that correct?
>>
>> You can compute continuous updates using a reduce() or fold() function.
>> These will produce a new update for each incoming event.
>> For example:
>>
>> val s: DataStream[(Int, Long)] = ...
>> s.keyBy(_._1)
>>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>>
>> would continuously compute a sum for every key (_._1) and produce an
>> update for each incoming record.
>>
>> You could add a flag to the record and implement a ReduceFunction that
>> marks a record as final when the no-more-trades event is received.
>> With a filter and a data sink you could emit such final records to a
>> persistent data store.
>>
>> Btw.: You can also define custom trigger policies for windows. A custom
>> trigger is called for each element that is added to a window and when
>> certain timers expire. For example with a custom trigger, you can evaluate
>> a window for every second element that is added. You can also define
>> whether the elements in the window should be retained or removed after the
>> evaluation.
>>
>> Best, Fabian
>>
>>
>>
>> 2015-11-24 21:32 GMT+01:00 Anton Polyakov :
>>
>>> Hi Max
>>>
>>> thanks for reply. From what I understand window works in a way that it
>>> buffers records while window is open, then apply transformation once window
>>> close is triggered and pass transformed result.
>>> In my case then window will be open for few hours, then the whole amount
>>> of trades will be processed once window close is triggered. Actually I want
>>> to process events as they are produced without buffering them. It is more
>>> like a stream with some special mark versus windowing seems more like a
>>> batch (if I understand it correctly).
>>>
>>> In other words - buffering and waiting for window to close, then
>>> processing will be equal to simply doing one-off processing when all events
>>> are produced. I am looking for a solution when I am processing events as
>>> they are produced and when source signals "done" my processing is also
>>> nearly done.
>>>
>>>
>>> On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels 
>>> wrote:
>>>
 Hi Anton,

 You should be able to model 

Re: Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the
source, while reading source events I can find out that - this is the
source event I want to take actions after. So if at ssource I can then emit
checkpoint and catch it at the end of the DAG that would solve my problem
(well, I also need to somehow distinguish my checkpoint from Flink's
auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion,
can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen  wrote:

> Hi Anton!
>
> That you can do!
>
> You can look at the interfaces "Checkpointed" and "checkpointNotifier".
> There you will get a call at every checkpoint (and can look at what records
> are before that checkpoint). You also get a call once the checkpoint is
> complete, which corresponds to the point when everything has flown through
> the DAG.
>
> I think it is nice to implement it like that, because it works
> non-blocking: The stream continues while the the records-you-wait-for flow
> through the DAG, and you get an asynchronous notification once they have
> flown all the way through.
>
> Greetings,
> Stephan
>
>
> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov  > wrote:
>
>> I think I can turn my problem into a simpler one.
>>
>> Effectively what I need - I need way to checkpoint certain events in
>> input stream and once this checkpoint reaches end of DAG take some action.
>> So I need a signal at the sink which can tell "all events in source before
>> checkpointed event are now processed".
>>
>> As far as I understand flagged record don't quite work since DAG doesn't
>> propagate source events one-to-one. Some transformations might create 3
>> child events out of 1 source. If I want to make sure I fully processed
>> source event, I need to wait till all childs are processed.
>>
>>
>>
>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov > > wrote:
>>
>>> Hi Fabian
>>>
>>> Defining a special flag for record seems like a checkpoint barrier. I
>>> think I will end up re-implementing checkpointing myself. I found the
>>> discussion in flink-dev:
>>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>>> 
>>>  which
>>> seems to solve my task. Essentially they want to have a mechanism which
>>> will mark record produced by job as “last” and then wait until it’s fully
>>> propagated through DAG. Similarly to what I need. Essentially my job which
>>> produces trades can also thought as being finished once it produced all
>>> trades, then I just need to wait till latest trade produced by this job is
>>> processed.
>>>
>>> So although windows can probably also be applied, I think propagating
>>> barrier through DAG and checkpointing at final job is what I need.
>>>
>>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
>>> triggering a custom checkoint or finishing streaming job)?
>>>
>>> On 24 Nov 2015, at 21:53, Fabian Hueske  wrote:
>>>
>>> Hi Anton,
>>>
>>> If I got your requirements right, you are looking for a solution that
>>> continuously produces updated partial aggregates in a streaming fashion.
>>> When a  special event (no more trades) is received, you would like to store
>>> the last update as a final result. Is that correct?
>>>
>>> You can compute continuous updates using a reduce() or fold() function.
>>> These will produce a new update for each incoming event.
>>> For example:
>>>
>>> val s: DataStream[(Int, Long)] = ...
>>> s.keyBy(_._1)
>>>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>>>
>>> would continuously compute a sum for every key (_._1) and produce an
>>> update for each incoming record.
>>>
>>> You could add a flag to the record and implement a ReduceFunction that
>>> marks a record as final when the no-more-trades event is received.
>>> With a filter and a data sink you could emit such final records to a
>>> persistent data store.
>>>
>>> Btw.: You can also define custom trigger policies for windows. A custom
>>> trigger is called for each element that is added to a window and when
>>> certain timers expire. For example with a custom trigger, you can evaluate
>>> a window for every second element that is added. You can also define
>>> whether the elements in the window should be retained or removed after the
>>> evaluation.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2015-11-24 21:32 GMT+01:00 Anton Polyakov :
>>>
 Hi Max

 thanks for reply. From what I understand window works in a way that it
 buffers records while window is open, then apply transformation once window
 close is triggered and pass transformed result.
 In my case then window will be open for few hours, then the whole
 amount of trades will be processed 

Re: Interpretation of Trigger and Eviction on a window

2015-11-30 Thread Aljoscha Krettek
Hi,
the function is in fact applied to the remaining elements (at least I hope it 
is). So the first sentence should be the correct one.

Cheers,
Aljoscha
> On 28 Nov 2015, at 03:14, Nirmalya Sengupta  
> wrote:
> 
> Hello Fabian,
> 
> From your reply to this thread: 
> ' it is correct that the evictor is called BEFORE the window function is 
> applied because this is required to support certain types of sliding windows. 
> '
> 
> This is clear to me now. However, my point was about the way it is described 
> in the User-guide. The guide says this:
> ' After the trigger fires, and before the function (e.g., sum, count) is 
> applied to the window contents, an optional Evictor removes some elements 
> from the beginning of the window before the remaining elements are passed on 
> to the function '
> 
> As I read it again, I see where the problem lies. It says some elements are 
> removed before the **rest** are passed to the function. This is not what 
> happens, I think. Evictor removes elements and the function sees this set of 
> removed elements, not the remaining elements. Remaining elements remain in 
> the window and are perhaps picked up by the Evictor next time.
> 
> Carrying on from your elaboration, I think guide's statement can be better 
> rearranged as:
> 
> ' After the trigger fires, the function (e.g., sum, count) is applied to the 
> entire contents of the window. However, an optionally provided Evictor, 
> removes some elements from the beginning of the window, according to the 
> criteria of eviction. The function is then applied to this set of __removed__ 
> elements. '
>  
> Let me know if I am way off the mark here.
> 
> -- Nirmalya
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is 
> where they should be.
> Now put the foundation under them."



Re: key

2015-11-30 Thread Fabian Hueske
Hi Radu,

with the corrected setters/getters, Flink accepts your data type as a POJO
data type which automatically is a key (in contrast to the GenericType it
was before).
There is no need to implement the Key interface.

Best, Fabian

2015-11-30 17:46 GMT+01:00 Radu Tudoran :

> Thank you both for the information.
>
> Meanwhile I realized I had a mistmatch between the setters/getters names
> and fields, after correcting this, it worked. It seems that it works also
> without implementing the Key interface.
>
> My new question is, should it still implement the key interface? Is it
> expected to go wrong somewhere else without having it?
>
>
>
> @Marton – the answer was that I need 2 of these fields to be used to do
> the keyBy (long, string)
>
>
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>
> *From:* Márton Balassi [mailto:balassi.mar...@gmail.com]
> *Sent:* Monday, November 30, 2015 5:42 PM
> *To:* user@flink.apache.org
> *Subject:* Re: key
>
>
>
> Hey Radu,
>
> To add to Till's comment: do you need the whole Event type to be the key
> are would you like to group the records based on the value of one of your
> attributes (the 2 longs, int or string as mentioned)? If the latter is true
> Flink comes with utilities to use standard types as keys. In the former
> case Till's comment holds.
>
> Best,
>
> Marton
>
>
>
> On Mon, Nov 30, 2015 at 5:38 PM, Till Rohrmann 
> wrote:
>
> Hi Radu,
>
> if you want to use custom types as keys, then these custom types have to
> implement the Key interface.
>
> Cheers,
> Till
>
> ​
>
>
>
> On Mon, Nov 30, 2015 at 5:28 PM, Radu Tudoran 
> wrote:
>
> Hi,
>
>
>
> I want to apply a “keyBy operator on a stream”.
>
> The string is of type MyEvent. This is a simple type that contains 2 longs
> and and int or string
>
>
>
> However, when applying this I get
>
>
>
> Exception in thread "main"
> *org.apache.flink.api.common.InvalidProgramException*: This type
> (GenericType) cannot be used as key.
>
>
>
> Can you give me a hint about a solution to this?
>
>
>
> Thanks
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>
>
>
>
>


Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-30 Thread Aljoscha Krettek
Maybe. In the Kafka case we just need to ensure that parallel instances of the 
source that know that they don’t have any partitions assigned to them emit 
Long.MAX_VALUE as a watermark.

> On 30 Nov 2015, at 17:50, Gyula Fóra  wrote:
> 
> Hi,
> 
> I think what we will need at some point for this are approximate whatermarks 
> which correlate event and ingest time.
> 
> I think they have similar concepts in Millwheel/Dataflow.
> 
> Cheers,
> Gyula
> On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek  wrote:
> Hi,
> as an addition. I don’t have a solution yet, for the general problem of what 
> happens when a parallel instance of a source never receives elements. This 
> watermark business is very tricky...
> 
> Cheers,
> Aljoscha
> > On 30 Nov 2015, at 17:20, Aljoscha Krettek  wrote:
> >
> > Hi Konstantin,
> > I finally nailed down the problem. :-)
> >
> > The basis of the problem is the fact that there is a mismatch in the 
> > parallelism of the Flink Kafka Consumer and the number of partitions in the 
> > Kafka Stream. I would assume that in your case the Kafka Stream has 1 
> > partition. This means, that only one of the parallel instances of the Flink 
> > Kafka Consumer ever receives element, which in turn means that only one of 
> > the parallel instances of the timestamp extractor ever receives elements. 
> > This means that no watermarks get emitted for the other parallel instances 
> > which in turn means that the watermark does not advance downstream because 
> > the watermark at an operator is the minimum over all upstream watermarks. 
> > This explains why ExampleTimestampExtractor1 only works in the case with 
> > parallelism=1.
> >
> > The reason why ExampleTimestampExtractor2 works in all parallelism settings 
> > is not very obvious. The secret is in this method:
> >
> > @Override
> > public long getCurrentWatermark() {
> >   return lastTimestamp - maxDelay;
> > }
> >
> > In the parallel instances that never receive any element lastTimestamp is 
> > set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - 
> > maxDelay (+1)). Now, because the watermark at an operator is always the 
> > minimum over all watermarks from upstream operators the watermark at the 
> > window operator always tracks the watermark of the parallel instance that 
> > receives elements.
> >
> > I hope this helps, but please let me know if I should provide more 
> > explanation. This is a very tricky topic.
> >
> > Cheers,
> > Aljoscha
> >
> >> On 29 Nov 2015, at 21:18, Konstantin Knauf  
> >> wrote:
> >>
> >> Hi Aljoscha,
> >>
> >> I have put together a gist [1] with two classes, a short processing
> >> pipeline, which shows the behavior and a data generator to write records
> >> into Kafka. I hope I remembered everything we discussed correctly.
> >>
> >> So basically in the example it works with "TimestampExtractor1" only for
> >> parallelism 1, with "TimestampExtractor2" it works regardless of the
> >> parallelism. Run from the IDE.
> >>
> >> Let me know if you need anything else.
> >>
> >> Cheers,
> >>
> >> Konstantin
> >>
> >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
> >>
> >> On 25.11.2015 21:15, Konstantin Knauf wrote:
> >>> Hi Aljoscha,
> >>>
> >>> sure, will do. I have neither found a solution. I won't have time to put
> >>> a minimal example together before the weekend though.
> >>>
> >>> Cheers,
> >>>
> >>> Konstantin
> >>>
> >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
>  Hi Konstantin,
>  I still didn’t come up with an explanation for the behavior. Could you 
>  maybe send me example code (and example data if it is necessary to 
>  reproduce the problem.)? This would really help me pinpoint the problem.
> 
>  Cheers,
>  Aljoscha
> > On 17 Nov 2015, at 21:42, Konstantin Knauf 
> >  wrote:
> >
> > Hi Aljoscha,
> >
> > Are you sure? I am running the job from my IDE at the moment.
> >
> > If I set
> >
> > StreamExecutionEnvironment.setParallelism(1);
> >
> > I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> > getCurrentWatermark() and emitting a watermark at every record)
> >
> > If I set
> >
> > StreamExecutionEnvironment.setParallelism(5);
> >
> > it does not work.
> >
> > So, if I understood you correctly, it is the opposite of what you were
> > expecting?!
> >
> > Cheers,
> >
> > Konstantin
> >
> >
> > On 17.11.2015 11:32, Aljoscha Krettek wrote:
> >> Hi,
> >> actually, the bug is more subtle. Normally, it is not a problem that 
> >> the TimestampExtractor sometimes emits a watermark that is lower than 
> >> the one before. (This is the result of the bug with Long.MIN_VALUE I 
> >> mentioned before). The stream operators wait for watermarks from all 
> >> 

Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
I think overall it would a very usefull feature to have ability to track
procession of source stream events by attaching barriers to them and
reacting on them in processing stages. working with time windows cant help
since processing can involve some long running operations (eg db queries)
and working with markers/event counts cant work either as diring processing
events might spawn child events.

However without ability to specify where in the source you put a barrier
one cant do it.


On Mon, Nov 30, 2015 at 3:35 PM, Stephan Ewen > wrote:

> You cannot force a barrier at one point in time. At what time checkpoints
> are triggered is decided by the master node.
>
> I think in your case you can use the checkpoint and notification calls to
> figure out when data has flown through the DAG, but you cannot force a
> barrier at a specific point.
>
> On Mon, Nov 30, 2015 at 3:33 PM, Anton Polyakov  > wrote:
>
>> Hi Stephan
>>
>> sorry for misunderstanding, but how do I make sure barrier is placed at
>> the proper time? How does my source "force" checkpoint to start happening
>> once it finds that all needed elements are now produced?
>>
>> On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen > > wrote:
>>
>>> Hi!
>>>
>>> If you implement the "Checkpointed" interface, you get the function
>>> calls to "snapshotState()" at the point when the checkpoint barrier arrives
>>> at an operator. So, the call to "snapshotState()" in the sink is when the
>>> barrier reaches the sink. The call to "checkpointComplete()" in the sources
>>> comes after all barriers have reached all sinks.
>>>
>>> Have a look here for an illustration about barriers flowing with the
>>> stream:
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov <
>>> polyakov.an...@gmail.com
>>> > wrote:
>>>
 Hi Stephan

 thanks that looks super. But source needs then to emit checkpoint. At
 the source, while reading source events I can find out that - this is the
 source event I want to take actions after. So if at ssource I can then emit
 checkpoint and catch it at the end of the DAG that would solve my problem
 (well, I also need to somehow distinguish my checkpoint from Flink's
 auto-generated ones).

 Sorry for being too chatty, this is the topic where I need expert
 opinion, can't find out the answer by just googling.


 On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen > wrote:

> Hi Anton!
>
> That you can do!
>
> You can look at the interfaces "Checkpointed" and
> "checkpointNotifier". There you will get a call at every checkpoint (and
> can look at what records are before that checkpoint). You also get a call
> once the checkpoint is complete, which corresponds to the point when
> everything has flown through the DAG.
>
> I think it is nice to implement it like that, because it works
> non-blocking: The stream continues while the the records-you-wait-for flow
> through the DAG, and you get an asynchronous notification once they have
> flown all the way through.
>
> Greetings,
> Stephan
>
>
> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <
> polyakov.an...@gmail.com
> > wrote:
>
>> I think I can turn my problem into a simpler one.
>>
>> Effectively what I need - I need way to checkpoint certain events in
>> input stream and once this checkpoint reaches end of DAG take some 
>> action.
>> So I need a signal at the sink which can tell "all events in source 
>> before
>> checkpointed event are now processed".
>>
>> As far as I understand flagged record don't quite work since DAG
>> doesn't propagate source events one-to-one. Some transformations might
>> create 3 child events out of 1 source. If I want to make sure I fully
>> processed source event, I need to wait till all childs are processed.
>>
>>
>>
>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <
>> polyakov.an...@gmail.com
>> > wrote:
>>
>>> Hi Fabian
>>>
>>> Defining a special flag for record seems like a checkpoint barrier.
>>> I think I will end up re-implementing checkpointing myself. I found the
>>> discussion in flink-dev:
>>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>>> 

Re: key

2015-11-30 Thread Till Rohrmann
Hi Radu,

if you want to use custom types as keys, then these custom types have to
implement the Key interface.

Cheers,
Till
​

On Mon, Nov 30, 2015 at 5:28 PM, Radu Tudoran 
wrote:

> Hi,
>
>
>
> I want to apply a “keyBy operator on a stream”.
>
> The string is of type MyEvent. This is a simple type that contains 2 longs
> and and int or string
>
>
>
> However, when applying this I get
>
>
>
> Exception in thread "main"
> *org.apache.flink.api.common.InvalidProgramException*: This type
> (GenericType) cannot be used as key.
>
>
>
> Can you give me a hint about a solution to this?
>
>
>
> Thanks
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>


Re: Custom TimestampExtractor and FlinkKafkaConsumer082

2015-11-30 Thread Aljoscha Krettek
Hi,
as an addition. I don’t have a solution yet, for the general problem of what 
happens when a parallel instance of a source never receives elements. This 
watermark business is very tricky...

Cheers,
Aljoscha
> On 30 Nov 2015, at 17:20, Aljoscha Krettek  wrote:
> 
> Hi Konstantin,
> I finally nailed down the problem. :-)
> 
> The basis of the problem is the fact that there is a mismatch in the 
> parallelism of the Flink Kafka Consumer and the number of partitions in the 
> Kafka Stream. I would assume that in your case the Kafka Stream has 1 
> partition. This means, that only one of the parallel instances of the Flink 
> Kafka Consumer ever receives element, which in turn means that only one of 
> the parallel instances of the timestamp extractor ever receives elements. 
> This means that no watermarks get emitted for the other parallel instances 
> which in turn means that the watermark does not advance downstream because 
> the watermark at an operator is the minimum over all upstream watermarks. 
> This explains why ExampleTimestampExtractor1 only works in the case with 
> parallelism=1. 
> 
> The reason why ExampleTimestampExtractor2 works in all parallelism settings 
> is not very obvious. The secret is in this method:
> 
> @Override
> public long getCurrentWatermark() {
>   return lastTimestamp - maxDelay;
> }
> 
> In the parallel instances that never receive any element lastTimestamp is set 
> to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE - 
> maxDelay (+1)). Now, because the watermark at an operator is always the 
> minimum over all watermarks from upstream operators the watermark at the 
> window operator always tracks the watermark of the parallel instance that 
> receives elements. 
> 
> I hope this helps, but please let me know if I should provide more 
> explanation. This is a very tricky topic.
> 
> Cheers,
> Aljoscha
> 
>> On 29 Nov 2015, at 21:18, Konstantin Knauf  
>> wrote:
>> 
>> Hi Aljoscha,
>> 
>> I have put together a gist [1] with two classes, a short processing
>> pipeline, which shows the behavior and a data generator to write records
>> into Kafka. I hope I remembered everything we discussed correctly.
>> 
>> So basically in the example it works with "TimestampExtractor1" only for
>> parallelism 1, with "TimestampExtractor2" it works regardless of the
>> parallelism. Run from the IDE.
>> 
>> Let me know if you need anything else.
>> 
>> Cheers,
>> 
>> Konstantin
>> 
>> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
>> 
>> On 25.11.2015 21:15, Konstantin Knauf wrote:
>>> Hi Aljoscha,
>>> 
>>> sure, will do. I have neither found a solution. I won't have time to put
>>> a minimal example together before the weekend though.
>>> 
>>> Cheers,
>>> 
>>> Konstantin
>>> 
>>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
 Hi Konstantin,
 I still didn’t come up with an explanation for the behavior. Could you 
 maybe send me example code (and example data if it is necessary to 
 reproduce the problem.)? This would really help me pinpoint the problem.
 
 Cheers,
 Aljoscha
> On 17 Nov 2015, at 21:42, Konstantin Knauf  
> wrote:
> 
> Hi Aljoscha,
> 
> Are you sure? I am running the job from my IDE at the moment.
> 
> If I set
> 
> StreamExecutionEnvironment.setParallelism(1);
> 
> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
> getCurrentWatermark() and emitting a watermark at every record)
> 
> If I set
> 
> StreamExecutionEnvironment.setParallelism(5);
> 
> it does not work.
> 
> So, if I understood you correctly, it is the opposite of what you were
> expecting?!
> 
> Cheers,
> 
> Konstantin
> 
> 
> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>> Hi,
>> actually, the bug is more subtle. Normally, it is not a problem that the 
>> TimestampExtractor sometimes emits a watermark that is lower than the 
>> one before. (This is the result of the bug with Long.MIN_VALUE I 
>> mentioned before). The stream operators wait for watermarks from all 
>> upstream operators and only advance the watermark monotonically in 
>> lockstep with them. This way, the watermark cannot decrease at an 
>> operator.
>> 
>> In your case, you have a topology with parallelism 1, I assume. In that 
>> case the operators are chained. (There is no separate operators but 
>> basically only one operator and element transmission happens in function 
>> calls). In this setting the watermarks are directly forwarded to 
>> operators without going through the logic I mentioned above.
>> 
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 18:13, Konstantin Knauf 
>>>  wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> I changed the Timestamp Extraktor 

Re: Iterative queries on Flink

2015-11-30 Thread Fabian Hueske
The basic building blocks are there but I am not aware of any efforts to
implement caching and add it to the API.

2015-11-30 16:55 GMT+01:00 Flavio Pompermaier :

> Is there any effort in this direction? maybe I could achieve something
> like that using Tachyon in some way...?
>
> On Mon, Nov 30, 2015 at 4:52 PM, Fabian Hueske  wrote:
>
>> Hi Flavio,
>>
>> Flink does not support caching of data sets in memory yet.
>>
>> Best, Fabian
>>
>> 2015-11-30 16:45 GMT+01:00 Flavio Pompermaier :
>>
>>> Hi to all,
>>> I was wondering if Flink could fit a use case where a user load a
>>> dataset in memory and then he/she wants to explore it interactively. Let's
>>> say I want to load a csv, then filter out the rows where the column value
>>> match some criteria, then apply another criteria after seeing the results
>>> of the first filter.
>>> Is there a way to keep the dataset in memory and modify it interactively
>>> without re-reading all the dataset every time I want to chain another
>>> operation to my dataset?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>


Re: Triggering events

2015-11-30 Thread Niels Basjes
Thanks.
That works great.

Niels

On Mon, Nov 30, 2015 at 3:32 PM, Aljoscha Krettek 
wrote:

> Hi,
> the problem here is that the system needs to be aware that Watermarks will
> be flowing through the system. You can either do this via:
>
> env.setStreamTimeCharacteristic(EventTime);
>
> or:
>
> env.getConfig().enableTimestamps();
>
> I know, not very intuitive.
>
> Cheers,
> Aljoscha
>
> > On 30 Nov 2015, at 14:47, Niels Basjes  wrote:
> >
> > Hi,
> >
> > I'm experimenting with a custom Windowing setup over clickstream data.
> > I want the timestamps of this clickstream data to be the timestamps
> 'when the event occurred' and in the Windows I need to trigger on these
> times.
> >
> > For testing I created a source roughly like this:
> > public class ManualTimeEventSource extends
> RichEventTimeSourceFunction {
> > ctx.collectWithTimestamp(event, event.timestamp);
> >
> > But none of the triggers were called so I started digging through the
> code.
> > Then I figured I apparently needed to add the watermarks myself, so I
> added a line:
> > ctx.emitWatermark(new Watermark(event.timestamp));
> >
> > But now I get:
> >
> > Caused by: java.lang.ClassCastException:
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> >   at
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)
> >   at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> >   at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
> >   at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
> >   at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)
> >   at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)
> >   ... 9 more
> >
> > This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug
> in Flink or in my code?
> >
> > What is the right way to trigger the events in my Windowing setup?
> >
> >
> >
> > P.S. I'm binding my Java application against Flink version 0.10.1
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Iterative queries on Flink

2015-11-30 Thread Fabian Hueske
Hi Flavio,

Flink does not support caching of data sets in memory yet.

Best, Fabian

2015-11-30 16:45 GMT+01:00 Flavio Pompermaier :

> Hi to all,
> I was wondering if Flink could fit a use case where a user load a dataset
> in memory and then he/she wants to explore it interactively. Let's say I
> want to load a csv, then filter out the rows where the column value match
> some criteria, then apply another criteria after seeing the results of the
> first filter.
> Is there a way to keep the dataset in memory and modify it interactively
> without re-reading all the dataset every time I want to chain another
> operation to my dataset?
>
> Best,
> Flavio
>


Re: Iterative queries on Flink

2015-11-30 Thread Flavio Pompermaier
Is there any effort in this direction? maybe I could achieve something like
that using Tachyon in some way...?

On Mon, Nov 30, 2015 at 4:52 PM, Fabian Hueske  wrote:

> Hi Flavio,
>
> Flink does not support caching of data sets in memory yet.
>
> Best, Fabian
>
> 2015-11-30 16:45 GMT+01:00 Flavio Pompermaier :
>
>> Hi to all,
>> I was wondering if Flink could fit a use case where a user load a dataset
>> in memory and then he/she wants to explore it interactively. Let's say I
>> want to load a csv, then filter out the rows where the column value match
>> some criteria, then apply another criteria after seeing the results of the
>> first filter.
>> Is there a way to keep the dataset in memory and modify it interactively
>> without re-reading all the dataset every time I want to chain another
>> operation to my dataset?
>>
>> Best,
>> Flavio
>>
>
>


Re: Interpretation of Trigger and Eviction on a window

2015-11-30 Thread Nirmalya Sengupta
Hello Aljoscha ,

Many thanks for taking time to explain the behaviour of Evictor. The
essence of my original post - about how the guide explains an Evictor - was
this. I think the guide should make this (counterintuitive) explanation of
the parameter to Evictor clearer. May help others, yet uninitiated in the
world of Flink! :-)

Because you have offered to clarify further, given the following code
snippet:

.
.trigger(CountTrigger.of(5))
  .evictor(CountEvictor.of(4))
 .maxBy(1)

my understanding (after reading your mail) is that if I am not careful
about the parameters I pass to CountTrigger and CountEvictor, my function
may not work correctly. In this case, when the window is filled with 5
events, Evictor removes the first event and leaves 4. Thus, the function
never sees the 1st event.

Have I understood correctly? Will be happy to hear from you.

-- Nirmalya




-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: Interpretation of Trigger and Eviction on a window

2015-11-30 Thread Fabian Hueske
Yes, that is correct. The first element will be lost.

In fact, you do neither need a trigger nor an evictor if you want to get
the max element for each group of 5 elements.
See my reply on your other mail.

Cheers,
Fabian

2015-11-30 18:47 GMT+01:00 Nirmalya Sengupta :

> Hello Aljoscha ,
>
> Many thanks for taking time to explain the behaviour of Evictor. The
> essence of my original post - about how the guide explains an Evictor - was
> this. I think the guide should make this (counterintuitive) explanation of
> the parameter to Evictor clearer. May help others, yet uninitiated in the
> world of Flink! :-)
>
> Because you have offered to clarify further, given the following code
> snippet:
>
> .
> .trigger(CountTrigger.of(5))
>   .evictor(CountEvictor.of(4))
>  .maxBy(1)
>
> my understanding (after reading your mail) is that if I am not careful
> about the parameters I pass to CountTrigger and CountEvictor, my function
> may not work correctly. In this case, when the window is filled with 5
> events, Evictor removes the first event and leaves 4. Thus, the function
> never sees the 1st event.
>
> Have I understood correctly? Will be happy to hear from you.
>
> -- Nirmalya
>
>
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>


Re: Continuing from the stackoverflow post

2015-11-30 Thread Nirmalya Sengupta
Hello Fabian,

Thanks for going through my long mail and concise responses. I am just
happy that I was not way off the mark in my understanding.

It seems to me that I would rather wait for your blog before asking more
questions. Not sure, if I will left with enough drive to write my (planned)
blogs, once yours is out. :-)

Yes, your solution works and as luck would have it, I figured out the same
during the weekend, after going through your earlier responses. Thanks.

One question though:

You mentioned - ' It depends on the TriggerResult, what happens with the
four elements after the user function was invoked. A CountTrigger keeps the
elements in the window. '

Could you elaborate this point a bit? If CountTrigger _keeps_ the elements
in the window, who _removes_ them? Are the elements removed by the
Trigger's FIRE_AND_PURGE directive *or* by Flink runtime, when the current
pane is destroyed by the runtime before a new pane is created?

-- Nirmalya


-
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Working with protobuf wrappers

2015-11-30 Thread Krzysztof Zarzycki
Hi!
I'm trying to use generated Protobuf wrappers compiled with protoc and pass
them as objects between functions of Flink. I'm using Flink 0.10.0.
Unfortunately, I get an exception on runtime:

[...]
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
enrichments_ (com.company$MyObject)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:162)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:313)
... 11 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 15 more


I believed that protobuf are now serializable on default Flink
configuration after fixing this issue in 0.9/0.8.1:
https://issues.apache.org/jira/browse/FLINK-1392

Maybe it really is, but Flink just requires some configuration?
I'll be grateful for your help with this issue.
Cheers,
Krzysztof


Re: Cleanup of OperatorStates?

2015-11-30 Thread Stephan Ewen
Hi Niels!

Nice use case that you have!
I think you can solve this super nicely with Flink, such that "replay" and
"realtime" are literally the same program - they differ only in whether

Event time is, like you said, the key thing for "replay". Event time
depends on the progress in the timestamps of the data, so it can progress
at different speeds, depending on what the rate of your stream is.
With the appropriate data source, it will progress very fast in "replay
mode", so that you replay in "fast forward speed", and it progresses at the
same speed as processing time when you attach to the end of the Kafka queue.

When you define the time intervals in your program to react to event time
progress, then you will compute the right sessionization in both replay and
real time settings.

I am writing a little example code to share. The type of ID-assignment
sessions you want to do need an undocumented API right now, so I'll prepare
something there for you...

Greetings,
Stephan



On Sun, Nov 29, 2015 at 4:04 PM, Niels Basjes  wrote:

> Hi,
>
> The sessionid is present in the measurements. It can also be seen as a
> form of 'browser id'.
> Most websites use either a 'long lived random value in a cookie' or a
> 'application session id' for this.
>
> So with the id of the browser in hand I have the need to group all events
> into "periods of activity" which I call a visit.
> Such a visit is a bounded subset of all events from a single browser.
>
> What I need is to add a (sort of) random visit id to the events that
> becomes 'inactive' after more than X minutes of inactivity.
> I then want to add this visitid to each event and
> 1) stream them out in realtime
> 2) Wait till the visit ends and store the complete visit on disk (I am
> going for either AVRO or Parquet).
>
> I want to create diskfiles with all visits that ended in a specific time
> period. So essentially
> "Group by round(, 15 minutes)"
>
>
> Because of the need to be able to 'repair' things I came with the
> following question:
> In the Flink API I see the 'process time' (i.e. the actual time of the
> server) and the 'event time' (i.e. the time when and event was recorded).
>
> Now in my case all events are in Kafka (for say 2 weeks).
> When something goes wrong I want to be able to 'reprocess' everything from
> the start of the queue.
> Here the matter of 'event time' becomes a big question for me; In those
> 'replay' situations the event time will progress at a much higher speed
> than the normal 1sec/sec.
>
> How does this work in Apache Flink?
>
>
> Niels Basjes
>
>
> On Fri, Nov 27, 2015 at 3:28 PM, Stephan Ewen  wrote:
>
>> Hey Niels!
>>
>> You may be able to implement this in windows anyways, depending on your
>> setup. You can definitely implement state with timeout yourself (using the
>> more low-level state interface), or you may be able to use custom windows
>> for that (they can trigger on every element and return elements
>> immediately, thereby giving you low latency).
>>
>> Can you tell me where exactly the session ID comes from? Is that
>> something that the function with state generates itself?
>> Depending on that answer, I can outline either the window, or the custom
>> state way...
>>
>> Greetings,
>> Stephan
>>
>>
>>
>>
>>
>> On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes  wrote:
>>
>>> Hi,
>>>
>>> Thanks for the explanation.
>>> I have clickstream data arriving in realtime and I need to assign the
>>> visitId and stream it out again (with the visitId now begin part of the
>>> record) into Kafka with the lowest possible latency.
>>> Although the Window feature allows me to group and close the visit on a
>>> timeout/expire (as shown to me by Aljoscha in a separate email) it does
>>> make a 'window'.
>>>
>>> So (as requested) I created a ticket for such a feature:
>>> https://issues.apache.org/jira/browse/FLINK-3089
>>>
>>> Niels
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen  wrote:
>>>
 Hi Niels!

 Currently, state is released by setting the value for the key to null.
 If you are tracking web sessions, you can try and send a "end of session"
 element that sets the value to null.

 To be on the safe side, you probably want state that is automatically
 purged after a while. I would look into using Windows for that. The
 triggers there are flexible so you can schedule both actions on elements
 plus cleanup after a certain time delay (clock time or event time).

 The question about "state expiry" has come a few times. People seem to
 like working on state directly, but it should clean up automatically.

 Can you see if your use case fits onto windows, otherwise open a ticket
 for state expiry?

 Greetings,
 Stephan


 On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes  wrote:

> Hi,
>
> I'm working on a streaming 

Re: Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
Hi Stephan

thanks that looks super. But source needs then to emit checkpoint. At the
source, while reading source events I can find out that - this is the
source event I want to take actions after. So if at ssource I can then emit
checkpoint and catch it at the end of the DAG that would solve my problem
(well, I also need to somehow distinguish my checkpoint from Flink's
auto-generated ones).

Sorry for being too chatty, this is the topic where I need expert opinion,
can't find out the answer by just googling.


On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen  wrote:

> Hi Anton!
>
> That you can do!
>
> You can look at the interfaces "Checkpointed" and "checkpointNotifier".
> There you will get a call at every checkpoint (and can look at what records
> are before that checkpoint). You also get a call once the checkpoint is
> complete, which corresponds to the point when everything has flown through
> the DAG.
>
> I think it is nice to implement it like that, because it works
> non-blocking: The stream continues while the the records-you-wait-for flow
> through the DAG, and you get an asynchronous notification once they have
> flown all the way through.
>
> Greetings,
> Stephan
>
>
> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov  > wrote:
>
>> I think I can turn my problem into a simpler one.
>>
>> Effectively what I need - I need way to checkpoint certain events in
>> input stream and once this checkpoint reaches end of DAG take some action.
>> So I need a signal at the sink which can tell "all events in source before
>> checkpointed event are now processed".
>>
>> As far as I understand flagged record don't quite work since DAG doesn't
>> propagate source events one-to-one. Some transformations might create 3
>> child events out of 1 source. If I want to make sure I fully processed
>> source event, I need to wait till all childs are processed.
>>
>>
>>
>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov > > wrote:
>>
>>> Hi Fabian
>>>
>>> Defining a special flag for record seems like a checkpoint barrier. I
>>> think I will end up re-implementing checkpointing myself. I found the
>>> discussion in flink-dev:
>>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>>> 
>>>  which
>>> seems to solve my task. Essentially they want to have a mechanism which
>>> will mark record produced by job as “last” and then wait until it’s fully
>>> propagated through DAG. Similarly to what I need. Essentially my job which
>>> produces trades can also thought as being finished once it produced all
>>> trades, then I just need to wait till latest trade produced by this job is
>>> processed.
>>>
>>> So although windows can probably also be applied, I think propagating
>>> barrier through DAG and checkpointing at final job is what I need.
>>>
>>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
>>> triggering a custom checkoint or finishing streaming job)?
>>>
>>> On 24 Nov 2015, at 21:53, Fabian Hueske  wrote:
>>>
>>> Hi Anton,
>>>
>>> If I got your requirements right, you are looking for a solution that
>>> continuously produces updated partial aggregates in a streaming fashion.
>>> When a  special event (no more trades) is received, you would like to store
>>> the last update as a final result. Is that correct?
>>>
>>> You can compute continuous updates using a reduce() or fold() function.
>>> These will produce a new update for each incoming event.
>>> For example:
>>>
>>> val s: DataStream[(Int, Long)] = ...
>>> s.keyBy(_._1)
>>>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>>>
>>> would continuously compute a sum for every key (_._1) and produce an
>>> update for each incoming record.
>>>
>>> You could add a flag to the record and implement a ReduceFunction that
>>> marks a record as final when the no-more-trades event is received.
>>> With a filter and a data sink you could emit such final records to a
>>> persistent data store.
>>>
>>> Btw.: You can also define custom trigger policies for windows. A custom
>>> trigger is called for each element that is added to a window and when
>>> certain timers expire. For example with a custom trigger, you can evaluate
>>> a window for every second element that is added. You can also define
>>> whether the elements in the window should be retained or removed after the
>>> evaluation.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2015-11-24 21:32 GMT+01:00 Anton Polyakov :
>>>
 Hi Max

 thanks for reply. From what I understand window works in a way that it
 buffers records while window is open, then apply transformation once window
 close is triggered and pass transformed result.
 In my case then window will be open for few hours, then the whole
 amount of trades will be processed 

Re: Interpretation of Trigger and Eviction on a window

2015-11-30 Thread Nirmalya Sengupta
Hi Aljoscha ,

Thanks for taking interest in my post and question.

 If the Evictor removes elements  _before_ the function is applied, then
what happens the first time, the Evictor is acting? That's what I am
failing to understand. At the beginning of the operation on the Stream, he
Trigger finds 5 elements, the Evictor removes 2 of them (let's say), and
Sum sees incorrect number of elements. Isn't this possible or again, am I
demonstrating my loose grasp of the matter?

Please correct me.

-- N


-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: Working with State example /flink streaming

2015-11-30 Thread Anton Polyakov
Javier

sorry to jumping in, but I think your case is very similar to what I am
trying to achieve in the thread just next to yours (called "Watermarks as
"process completion" flags". I also need to process a stream which is
produced for some time, but then take an action after certain event. Also
window doesn't work for me because in my case stream producing data for 4-5
hours and I need to evaluate it continuously but then finalize upon
receiving certain "least event".

I am thinking that existing checkpointing would be very helpful as it
solves exactly this task but internally. If you'd be able to emit "special"
checkpoint in source and then react on it at the end of processing chain,
do you think you could solve your task?

On Fri, Nov 27, 2015 at 4:29 PM, Lopez, Javier 
wrote:

> Hi,
>
> Thanks for the example. We have done it with windows before and it works.
> We are using state because the data comes with a gap of several days and we
> can't handle a window size of several days. That's why we decided to use
> the state.
>
> On 27 November 2015 at 11:09, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I’ll try to go into a bit more detail about the windows here. What you
>> can do is this:
>>
>> DataStream> input = … // fields are (id,
>> sum, count), where count is initialized to 1, similar to word count
>>
>> DataStream> counts = input
>>   .keyBy(0)
>>   .timeWindow(Time.minutes(10))
>>   .reduce(new MyCountingReducer())
>>
>> DataStream> result = counts.map( > that divides sum by count> )
>>
>> Does this help? Here, you don’t even have to deal with state, the
>> windowing system will keep the state (i.e. the reduced) value in internal
>> state in a fault tolerant fashion.
>>
>> Cheers,
>> Aljoscha
>> > On 26 Nov 2015, at 17:14, Stephan Ewen  wrote:
>> >
>> > Hi!
>> >
>> > In streaming, there is no "end" of the stream when you would emit the
>> final sum. That's why there are windows.
>> >
>> > If you do not want the partial sums, but only the final sum, you need
>> to define what window in which the sum is computed. At the end of that
>> window, that value is emitted. The window can be based on time, counts, or
>> other measures.
>> >
>> > Greetings,
>> > Stephan
>> >
>> >
>> > On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier 
>> wrote:
>> > Hi, thanks for the answer. It worked but not in the way we expected. We
>> expect to have only one sum per ID and we are getting all the consecutive
>> sums, for example:
>> >
>> > We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the
>> initial values are ID -> 11, values -> 1,2,3). Here is the code we are
>> using for our test:
>> >
>> > DataStream> > uple2> stream = ...;
>> >
>> >
>> > DataStream> result =
>> stream.keyBy(0).map(new RollingSum());
>> >
>> >
>> >
>> > public static class RollingSum extends RichMapFunction> Double>, Tuple4> {
>> >
>> > // persistent counter
>> >   private OperatorState sum;
>> >   private OperatorState count;
>> >
>> >
>> > @Override
>> > public Tuple4 map(Tuple2> Double> value1) {
>> >   try {
>> >   Double newSum = sum.value()+value1.f1;
>> >
>> >   sum.update(newSum);
>> >   count.update(count.value()+1);
>> >   return new Tuple4> Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
>> >   } catch (IOException e) {
>> >   // TODO Auto-generated catch block
>> >   e.printStackTrace();
>> >   }
>> >
>> >   return null;
>> >
>> > }
>> >
>> > @Override
>> > public void open(Configuration config) {
>> > sum = getRuntimeContext().getKeyValueState("mySum",
>> Double.class, 0D);
>> > count = getRuntimeContext().getKeyValueState("myCounter",
>> Long.class, 0L);
>> > }
>> >
>> > }
>> >
>> >
>> > We are using a Tuple4 because we want to calculate the sum and the
>> average (So our Tuple is ID, SUM, Count, AVG). Do we need to add another
>> step to get a single value out of it? or is this the expected behavior.
>> >
>> > Thanks again for your help.
>> >
>> > On 25 November 2015 at 17:19, Stephan Ewen  wrote:
>> > Hi Javier!
>> >
>> > You can solve this both using windows, or using manual state.
>> >
>> > What is better depends a bit on when you want to have the result (the
>> sum). Do you want a result emitted after each update (or do some other
>> operation with that value) or do you want only the final sum after a
>> certain time?
>> >
>> > For the 

Triggering events

2015-11-30 Thread Niels Basjes
Hi,

I'm experimenting with a custom Windowing setup over clickstream data.
I want the timestamps of this clickstream data to be the timestamps 'when
the event occurred' and in the Windows I need to trigger on these times.

For testing I created a source roughly like this:

public class ManualTimeEventSource extends
RichEventTimeSourceFunction {

ctx.collectWithTimestamp(event, event.timestamp);


But none of the triggers were called so I started digging through the code.

Then I figured I apparently needed to add the watermarks myself, so I
added a line:

ctx.emitWatermark(new Watermark(event.timestamp));


But now I get:


*Caused by: java.lang.ClassCastException:
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
org.apache.flink.streaming.runtime.streamrecord.StreamRecord*
* at
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)*
* at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)*
* at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)*
* at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)*
* at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)*
* at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)*
* ... 9 more*


This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in
Flink or in my code?

What is the right way to trigger the events in my Windowing setup?



P.S. I'm binding my Java application against Flink version 0.10.1

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Interpretation of Trigger and Eviction on a window

2015-11-30 Thread Aljoscha Krettek
Hi,
the Evictor is very tricky to understand, I’m afraid. What happens when a 
Trigger fires is the following:
 1. Trigger fires
 2. Evictor can remove elements from the window buffer
 3. Window function processes the elements that remain in the window buffer

The tricky thing here is that the Evictor should really be called “Keeper”. 
What it does in fact is specify how many elements should be kept in the buffer. 
For example, an Evictor “CountEvict(5)” means, keep 5 elements, a 
“TimeEvictor(5 minutes)” keeps the elements that a younger than 5 minutes. I 
admit this is somewhat counterintuitive but this is how the policies work in 
IBM Infosphere Streams and they influenced the early design of the Flink 
Trigger/Eviction policies. (See here for a description of the semantics of IBM 
Infosphere Streams: 
http://www.cs.bilkent.edu.tr/~bgedik/homepage/lib/exe/fetch.php/wiki:pubs:windowing.pdf)

If it is still unclear, could you please but together a working example where 
the behavior is unclear to you so that we can have a look at it.

Cheers,
Aljoscha
> On 30 Nov 2015, at 12:08, Nirmalya Sengupta  
> wrote:
> 
> Hi Aljoscha ,
> 
> Thanks for taking interest in my post and question.
> 
>  If the Evictor removes elements  _before_ the function is applied, then what 
> happens the first time, the Evictor is acting? That's what I am failing to 
> understand. At the beginning of the operation on the Stream, he Trigger finds 
> 5 elements, the Evictor removes 2 of them (let's say), and Sum sees incorrect 
> number of elements. Isn't this possible or again, am I demonstrating my loose 
> grasp of the matter? 
> 
> Please correct me.
> 
> -- N
> 
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is 
> where they should be.
> Now put the foundation under them."



Re: Watermarks as "process completion" flags

2015-11-30 Thread Stephan Ewen
Hi!

If you implement the "Checkpointed" interface, you get the function calls
to "snapshotState()" at the point when the checkpoint barrier arrives at an
operator. So, the call to "snapshotState()" in the sink is when the barrier
reaches the sink. The call to "checkpointComplete()" in the sources comes
after all barriers have reached all sinks.

Have a look here for an illustration about barriers flowing with the
stream:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html

Stephan


On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov 
wrote:

> Hi Stephan
>
> thanks that looks super. But source needs then to emit checkpoint. At the
> source, while reading source events I can find out that - this is the
> source event I want to take actions after. So if at ssource I can then emit
> checkpoint and catch it at the end of the DAG that would solve my problem
> (well, I also need to somehow distinguish my checkpoint from Flink's
> auto-generated ones).
>
> Sorry for being too chatty, this is the topic where I need expert opinion,
> can't find out the answer by just googling.
>
>
> On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen  wrote:
>
>> Hi Anton!
>>
>> That you can do!
>>
>> You can look at the interfaces "Checkpointed" and "checkpointNotifier".
>> There you will get a call at every checkpoint (and can look at what records
>> are before that checkpoint). You also get a call once the checkpoint is
>> complete, which corresponds to the point when everything has flown through
>> the DAG.
>>
>> I think it is nice to implement it like that, because it works
>> non-blocking: The stream continues while the the records-you-wait-for flow
>> through the DAG, and you get an asynchronous notification once they have
>> flown all the way through.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <
>> polyakov.an...@gmail.com> wrote:
>>
>>> I think I can turn my problem into a simpler one.
>>>
>>> Effectively what I need - I need way to checkpoint certain events in
>>> input stream and once this checkpoint reaches end of DAG take some action.
>>> So I need a signal at the sink which can tell "all events in source before
>>> checkpointed event are now processed".
>>>
>>> As far as I understand flagged record don't quite work since DAG doesn't
>>> propagate source events one-to-one. Some transformations might create 3
>>> child events out of 1 source. If I want to make sure I fully processed
>>> source event, I need to wait till all childs are processed.
>>>
>>>
>>>
>>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <
>>> polyakov.an...@gmail.com> wrote:
>>>
 Hi Fabian

 Defining a special flag for record seems like a checkpoint barrier. I
 think I will end up re-implementing checkpointing myself. I found the
 discussion in flink-dev:
 mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
 
  which
 seems to solve my task. Essentially they want to have a mechanism which
 will mark record produced by job as “last” and then wait until it’s fully
 propagated through DAG. Similarly to what I need. Essentially my job which
 produces trades can also thought as being finished once it produced all
 trades, then I just need to wait till latest trade produced by this job is
 processed.

 So although windows can probably also be applied, I think propagating
 barrier through DAG and checkpointing at final job is what I need.

 Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
 triggering a custom checkoint or finishing streaming job)?

 On 24 Nov 2015, at 21:53, Fabian Hueske  wrote:

 Hi Anton,

 If I got your requirements right, you are looking for a solution that
 continuously produces updated partial aggregates in a streaming fashion.
 When a  special event (no more trades) is received, you would like to store
 the last update as a final result. Is that correct?

 You can compute continuous updates using a reduce() or fold() function.
 These will produce a new update for each incoming event.
 For example:

 val s: DataStream[(Int, Long)] = ...
 s.keyBy(_._1)
   .reduce( (x,y) => (x._1, y._2 + y._2) )

 would continuously compute a sum for every key (_._1) and produce an
 update for each incoming record.

 You could add a flag to the record and implement a ReduceFunction that
 marks a record as final when the no-more-trades event is received.
 With a filter and a data sink you could emit such final records to a
 persistent data store.

 Btw.: You can also define custom trigger policies for windows. A custom
 trigger is called for each element 

Re: Triggering events

2015-11-30 Thread Aljoscha Krettek
Hi,
the problem here is that the system needs to be aware that Watermarks will be 
flowing through the system. You can either do this via:

env.setStreamTimeCharacteristic(EventTime);

or:

env.getConfig().enableTimestamps();

I know, not very intuitive.

Cheers,
Aljoscha

> On 30 Nov 2015, at 14:47, Niels Basjes  wrote:
> 
> Hi,
> 
> I'm experimenting with a custom Windowing setup over clickstream data.
> I want the timestamps of this clickstream data to be the timestamps 'when the 
> event occurred' and in the Windows I need to trigger on these times.
> 
> For testing I created a source roughly like this:
> public class ManualTimeEventSource extends 
> RichEventTimeSourceFunction {
> ctx.collectWithTimestamp(event, event.timestamp);
> 
> But none of the triggers were called so I started digging through the code.
> Then I figured I apparently needed to add the watermarks myself, so I added a 
> line:
> ctx.emitWatermark(new Watermark(event.timestamp));
> 
> But now I get:
> 
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)
>   ... 9 more
> 
> This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in 
> Flink or in my code?
> 
> What is the right way to trigger the events in my Windowing setup?
> 
> 
> 
> P.S. I'm binding my Java application against Flink version 0.10.1
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes



Re: Watermarks as "process completion" flags

2015-11-30 Thread Anton Polyakov
Hi Stephan

sorry for misunderstanding, but how do I make sure barrier is placed at the
proper time? How does my source "force" checkpoint to start happening once
it finds that all needed elements are now produced?

On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen  wrote:

> Hi!
>
> If you implement the "Checkpointed" interface, you get the function calls
> to "snapshotState()" at the point when the checkpoint barrier arrives at an
> operator. So, the call to "snapshotState()" in the sink is when the barrier
> reaches the sink. The call to "checkpointComplete()" in the sources comes
> after all barriers have reached all sinks.
>
> Have a look here for an illustration about barriers flowing with the
> stream:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html
>
> Stephan
>
>
> On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov  > wrote:
>
>> Hi Stephan
>>
>> thanks that looks super. But source needs then to emit checkpoint. At the
>> source, while reading source events I can find out that - this is the
>> source event I want to take actions after. So if at ssource I can then emit
>> checkpoint and catch it at the end of the DAG that would solve my problem
>> (well, I also need to somehow distinguish my checkpoint from Flink's
>> auto-generated ones).
>>
>> Sorry for being too chatty, this is the topic where I need expert
>> opinion, can't find out the answer by just googling.
>>
>>
>> On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen  wrote:
>>
>>> Hi Anton!
>>>
>>> That you can do!
>>>
>>> You can look at the interfaces "Checkpointed" and "checkpointNotifier".
>>> There you will get a call at every checkpoint (and can look at what records
>>> are before that checkpoint). You also get a call once the checkpoint is
>>> complete, which corresponds to the point when everything has flown through
>>> the DAG.
>>>
>>> I think it is nice to implement it like that, because it works
>>> non-blocking: The stream continues while the the records-you-wait-for flow
>>> through the DAG, and you get an asynchronous notification once they have
>>> flown all the way through.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <
>>> polyakov.an...@gmail.com> wrote:
>>>
 I think I can turn my problem into a simpler one.

 Effectively what I need - I need way to checkpoint certain events in
 input stream and once this checkpoint reaches end of DAG take some action.
 So I need a signal at the sink which can tell "all events in source before
 checkpointed event are now processed".

 As far as I understand flagged record don't quite work since DAG
 doesn't propagate source events one-to-one. Some transformations might
 create 3 child events out of 1 source. If I want to make sure I fully
 processed source event, I need to wait till all childs are processed.



 On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <
 polyakov.an...@gmail.com> wrote:

> Hi Fabian
>
> Defining a special flag for record seems like a checkpoint barrier. I
> think I will end up re-implementing checkpointing myself. I found the
> discussion in flink-dev:
> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
> 
>  which
> seems to solve my task. Essentially they want to have a mechanism which
> will mark record produced by job as “last” and then wait until it’s fully
> propagated through DAG. Similarly to what I need. Essentially my job which
> produces trades can also thought as being finished once it produced all
> trades, then I just need to wait till latest trade produced by this job is
> processed.
>
> So although windows can probably also be applied, I think propagating
> barrier through DAG and checkpointing at final job is what I need.
>
> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
> triggering a custom checkoint or finishing streaming job)?
>
> On 24 Nov 2015, at 21:53, Fabian Hueske  wrote:
>
> Hi Anton,
>
> If I got your requirements right, you are looking for a solution that
> continuously produces updated partial aggregates in a streaming fashion.
> When a  special event (no more trades) is received, you would like to 
> store
> the last update as a final result. Is that correct?
>
> You can compute continuous updates using a reduce() or fold()
> function. These will produce a new update for each incoming event.
> For example:
>
> val s: DataStream[(Int, Long)] = ...
> s.keyBy(_._1)
>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>
> would continuously compute a sum for every key (_._1) and produce 

Re: Watermarks as "process completion" flags

2015-11-30 Thread Stephan Ewen
You cannot force a barrier at one point in time. At what time checkpoints
are triggered is decided by the master node.

I think in your case you can use the checkpoint and notification calls to
figure out when data has flown through the DAG, but you cannot force a
barrier at a specific point.

On Mon, Nov 30, 2015 at 3:33 PM, Anton Polyakov 
wrote:

> Hi Stephan
>
> sorry for misunderstanding, but how do I make sure barrier is placed at
> the proper time? How does my source "force" checkpoint to start happening
> once it finds that all needed elements are now produced?
>
> On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> If you implement the "Checkpointed" interface, you get the function calls
>> to "snapshotState()" at the point when the checkpoint barrier arrives at an
>> operator. So, the call to "snapshotState()" in the sink is when the barrier
>> reaches the sink. The call to "checkpointComplete()" in the sources comes
>> after all barriers have reached all sinks.
>>
>> Have a look here for an illustration about barriers flowing with the
>> stream:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html
>>
>> Stephan
>>
>>
>> On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov <
>> polyakov.an...@gmail.com> wrote:
>>
>>> Hi Stephan
>>>
>>> thanks that looks super. But source needs then to emit checkpoint. At
>>> the source, while reading source events I can find out that - this is the
>>> source event I want to take actions after. So if at ssource I can then emit
>>> checkpoint and catch it at the end of the DAG that would solve my problem
>>> (well, I also need to somehow distinguish my checkpoint from Flink's
>>> auto-generated ones).
>>>
>>> Sorry for being too chatty, this is the topic where I need expert
>>> opinion, can't find out the answer by just googling.
>>>
>>>
>>> On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen  wrote:
>>>
 Hi Anton!

 That you can do!

 You can look at the interfaces "Checkpointed" and "checkpointNotifier".
 There you will get a call at every checkpoint (and can look at what records
 are before that checkpoint). You also get a call once the checkpoint is
 complete, which corresponds to the point when everything has flown through
 the DAG.

 I think it is nice to implement it like that, because it works
 non-blocking: The stream continues while the the records-you-wait-for flow
 through the DAG, and you get an asynchronous notification once they have
 flown all the way through.

 Greetings,
 Stephan


 On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <
 polyakov.an...@gmail.com> wrote:

> I think I can turn my problem into a simpler one.
>
> Effectively what I need - I need way to checkpoint certain events in
> input stream and once this checkpoint reaches end of DAG take some action.
> So I need a signal at the sink which can tell "all events in source before
> checkpointed event are now processed".
>
> As far as I understand flagged record don't quite work since DAG
> doesn't propagate source events one-to-one. Some transformations might
> create 3 child events out of 1 source. If I want to make sure I fully
> processed source event, I need to wait till all childs are processed.
>
>
>
> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <
> polyakov.an...@gmail.com> wrote:
>
>> Hi Fabian
>>
>> Defining a special flag for record seems like a checkpoint barrier. I
>> think I will end up re-implementing checkpointing myself. I found the
>> discussion in flink-dev:
>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
>> 
>>  which
>> seems to solve my task. Essentially they want to have a mechanism which
>> will mark record produced by job as “last” and then wait until it’s fully
>> propagated through DAG. Similarly to what I need. Essentially my job 
>> which
>> produces trades can also thought as being finished once it produced all
>> trades, then I just need to wait till latest trade produced by this job 
>> is
>> processed.
>>
>> So although windows can probably also be applied, I think propagating
>> barrier through DAG and checkpointing at final job is what I need.
>>
>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e.
>> like triggering a custom checkoint or finishing streaming job)?
>>
>> On 24 Nov 2015, at 21:53, Fabian Hueske  wrote:
>>
>> Hi Anton,
>>
>> If I got your requirements right, you are looking for a solution that
>> continuously produces updated partial aggregates in a streaming fashion.

Re: Continuing from the stackoverflow post

2015-11-30 Thread Fabian Hueske
Sorry, I have to correct myself. The windowing semantics are not easy ;-)


2015-11-30 15:34 GMT+01:00 Fabian Hueske :

> Hi Nirmalya,
>
> thanks for the detailed description of your understanding of Flink's
> window semantics.
> Most of it is correct, but a few things need a bit of correction ;-)
>
> Please see my comments inline.
>
> 2015-11-28 4:36 GMT+01:00 Nirmalya Sengupta :
>
>> Hello Fabian,
>>
>>
>> A little long mail; please have some patience.
>>
>> From your response: ' Let's start by telling me what you actually want
>> to do ;-) '
>>
>> At a broad level, I want to write a (series of, perhaps) tutorial of
>> Flink, where these concepts are brought out by a mix of definition,
>> elaboration, illustration and of course, code snippets. If that helps the
>> community, I will be very happy. In the least, I will understand the
>> principles and their application, much better. So, I am a bit selfish here
>> perhaps. You also mention that you are preparing some such  material. If I
>> can complement your effort, I will be delighted.
>>
>>
> That sounds great! We are almost done with the blog post and will publish
> it soon. Looking forward to your feedback :-)
>
>
>> One never knows: going further, I may become a trainer / evangelist of
>> Apache Flink, if I show enough grasp of the subject! :-)
>>
>> Now to this particular question (from SOF):
>>
>> When I began, my intention was to find maximum temperature, *every 5
>> successive records* (to answer your question).
>>
>> As I have said before, I am learning and hence, trying with various
>> operator combinations on the same set of data to see what happens and then,
>> trying to explain why that happens.
>>
>> Let's refer to the code again:
>>
>> val readings =
>>   readIncomingReadings(env,"./sampleIOTTiny.csv")
>>   .map(e => (e.sensorUUID,e.ambientTemperature))
>>   .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
>>   .trigger(CountTrigger.of(5))
>>   .evictor(CountEvictor.of(4))
>>   .maxBy(1)
>>
>>
>> So, what I understand is this:
>>
>> timeWindowAll defines a pane of 5 msecs. When this time expires, the
>> timeWindowAll fires a kind of *onExpirationOf* trigger (I have
>> fabricated the name, quite likely it doesn't exist). This perhaps does
>> nothing other than passing to the function (here, *maxBy() *) the
>> contents of the window (whatever number of elements have been collected in
>> last 5 msecs) and clearing the pane, readying it for the next 5 msecs (not
>> exactly, but more of it later).
>>
>>
> This is correct. timeWindowAll(5 msecs) (without additional trigger
> definitions) will create a new window every 5 msec, trigger after 5 msecs
> (call the user function), purge the window, and create a new window.
> Independent of the trigger, when a window expires, it is removed (including
> all elements it contains) and a new window is created.
>

THIS IS NOT CORRECT --> "Independent of the trigger, when a window expires,
it is removed (including all elements it contains) and a new window is
created."
In fact, a window is only removed if a trigger returns FIRE_AND_PURGE or
PURGE. The default time windows (without additional Trigger) purge their
content at their end time. If you apply a trigger that does *not* purge the
content of the window after it expires, it will consume memory forever.


>
>
>> However, I provide a CountTrigger (5 elements). According to the rules
>> of Flink, this trigger replaces the aforementioned default onExpirationOf
>> trigger. Therefore, when timeWindowAll is ready after 5 msecs have
>> passed, what it finds available to fire is this CountTrigger. However, a
>> CountTrigger can fire only if its count-related (here, 5) criterion is
>> satisfied. So, after 5 msecs have passed, if the number of elements
>> collected in timeWindowAll pane is >= 5, *only* then CountTrigger will
>> fire; otherwise, CountTrigger will not stir and timeWindowAll will shrug
>> its shoulders and go back to wait for the next 5 msecs period.
>>
>>
> If you define a CountTrigger(5), it will triggered exactly once when
> exactly 5 elements are in the window. Even if there are 2mecs for the
> window left. This will also replace the current trigger, that would trigger
> at 5 msecs, i.e., the window is only evaluated once after the 5th element
> was inserted. It depends on the trigger, what happens with the elements in
> the pane after the function has been called. If you look at the Trigger
> interface, you'll find that TriggerResult might be FIRE or FIRE_AND_PURGE
> (among others). FIRE will call the user function and leave the elements in
> the window. FIRE_AND_PURGE will call the user function, purge (delete) the
> window, and create a new window within the same time bounds.
>
>
>> Going further, I provide a CountEvictor. According to the rules of
>>  Flink, an Evictor is allowed to act only when its associated trigger
>> (here, CountTrigger) is fired. Because of its