Flink Streaming Core 0.10 in maven repos

2015-11-23 Thread LINZ, Arnaud
Hello,

Small question: I can't find the Streaming Core component in 0.10 version in 
the maven repo :
http://mvnrepository.com/artifact/org.apache.flink/flink-streaming-core

Thus in my pom file this artifact is the only part of my Flink's dependencies 
to stay in 0.10-SNAPSHOT version.
Is there something wrong with that component's publication in 0.10 version ?

Greetings,
Arnaud




L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: Flink Streaming Core 0.10 in maven repos

2015-11-23 Thread Stephan Ewen
Hi Arnaud!

In 0.10 , we renamed the dependency to "flink-streaming-java" (and
flink-streaming-scala"), to be more in line with the structure of the
dependencies on the batch side.

Just replace "flink-streaming-core" with "flink-streaming-java"...

Greetings,
Stephan


On Mon, Nov 23, 2015 at 9:07 AM, LINZ, Arnaud 
wrote:

> Hello,
>
> Small question: I can't find the Streaming Core component in 0.10 version
> in the maven repo :
> http://mvnrepository.com/artifact/org.apache.flink/flink-streaming-core
>
> Thus in my pom file this artifact is the only part of my Flink's
> dependencies to stay in 0.10-SNAPSHOT version.
> Is there something wrong with that component's publication in 0.10 version
> ?
>
> Greetings,
> Arnaud
>
>
> 
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>


Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Konstantin Knauf
Thanks!

@Fabian: Yepp, but this still results in multiple outputs per window,
because the maximum is emitted for every key.

@Gyula: Yepp, that's the second bullet point from my question ;) The way
I implemented it, it basically doubles the latency, because the
timeWindowAll has to wait for the next timeWindow before it can close
the previous one. So if the first timeWindow is 10s, it takes 20s until
you have a result, although it cant change after 10s. You know what I mean?

Cheers,

Konstantin

On 23.11.2015 11:32, Gyula Fóra wrote:
> Hi,
> 
> Alright it seems there are multiple ways of doing this.
> 
> I would do something like:
> 
> ds.keyBy(key)
> .timeWindow(w)
> .reduce(...)
> .timeWindowAll(w)
> .reduce(...)
> 
> Maybe Aljoscha could jump in here :D
> 
> Cheers,
> Gyula
> 
> Fabian Hueske > ezt írta
> (időpont: 2015. nov. 23., H, 11:21):
> 
> If you set the key to the time attribute, the "old" key is no longer
> valid.
> The streams are organized by time and only one aggregate for each
> window-time should be computed.
> 
> This should do what you are looking for:
> 
> DataStream
>   .keyBy(_._1) // key by orginal key
>   .timeWindow(..) 
>   .apply(...)  // extract window end time: (origKey, time, agg)
>   .keyBy(_._2) // key by time field
>   .maxBy(_._3) // value with max agg field
> 
> Best, Fabian
> 
> 2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> >:
> 
> Hi Fabian,
> 
> thanks for your answer. Yes, that's what I want.
> 
> The solution you suggest is what I am doing right now (see last
> of the
> bullet point in my question).
> 
> But given your example. I would expect the following output:
> 
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
> 
> Because the reduce function is evaluated for every incoming
> event (i.e.
> each key), right?
> 
> Cheers,
> 
> Konstantin
> 
> On 23.11.2015 10:47, Fabian Hueske wrote:
> > Hi Konstantin,
> >
> > let me first summarize to make sure I understood what you are 
> looking for.
> > You computed an aggregate over a keyed event-time window and you are
> > looking for the maximum aggregate for each group of windows over the
> > same period of time.
> > So if you have
> > (key: 1, w-time: 10, agg: 17)
> > (key: 2, w-time: 10, agg: 20)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 2, w-time: 20, agg: 28)
> > (key: 3, w-time: 20, agg: 5)
> >
> > you would like to get:
> > (key: 2, w-time: 10, agg: 20)
> > (key: 1, w-time: 20, agg: 30)
> >
> > If this is correct, you can do this as follows.
> > You can extract the window start and end time from the TimeWindow
> > parameter of the WindowFunction and key the stream either by start 
> or
> > end time and apply a ReduceFunction on the keyed stream.
> >
> > Best, Fabian
> >
> > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf 
> 
> >  >>:
> >
> > Hi everyone,
> >
> > me again :) Let's say you have a stream, and for every
> window and key
> > you compute some aggregate value, like this:
> >
> > DataStream.keyBy(..)
> >   .timeWindow(..)
> >   .apply(...)
> >
> >
> > Now I want to get the maximum aggregate value for every
> window over the
> > keys. This feels like a pretty natural use case. How can I
> achieve this
> > with Flink in the most compact way?
> >
> > The options I thought of so far are:
> >
> > * Use an allTimeWindow, obviously. Drawback is, that the
> WindowFunction
> > would not be distributed by keys anymore.
> >
> > * use a windowAll after the WindowFunction to create
> windows of the
> > aggregates, which originated from the same timeWindow.
> This could be
> > done either with a TimeWindow or with a GlobalWindow with
> DeltaTrigger.
> > Drawback: Seems unnecessarily complicated and doubles the
> latency (at
> > least in my naive implementation ;)).
> >
> > * Of course, you could also just keyBy the start time of
> the window
> > after the WindowFunction, but then you 

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Gyula Fóra
Yes, you are right I think we should have some nice abstractions for doing
this.

Before the rewrite of the windowing runtime to support out-of-order events,
 we had abstractions for supporting this but that code was not feasible
from performance perspective.  (The result of a keyed window reduce used to
be a window containing all the aggregates and one could then just aggregate
again on the result without specifying the window again)

Maybe we could implement similar abstractions on the new window runtime, I
think that would be really awesome.

Gyula

Konstantin Knauf  ezt írta (időpont: 2015.
nov. 23., H, 11:40):

> Thanks!
>
> @Fabian: Yepp, but this still results in multiple outputs per window,
> because the maximum is emitted for every key.
>
> @Gyula: Yepp, that's the second bullet point from my question ;) The way
> I implemented it, it basically doubles the latency, because the
> timeWindowAll has to wait for the next timeWindow before it can close
> the previous one. So if the first timeWindow is 10s, it takes 20s until
> you have a result, although it cant change after 10s. You know what I mean?
>
> Cheers,
>
> Konstantin
>
> On 23.11.2015 11:32, Gyula Fóra wrote:
> > Hi,
> >
> > Alright it seems there are multiple ways of doing this.
> >
> > I would do something like:
> >
> > ds.keyBy(key)
> > .timeWindow(w)
> > .reduce(...)
> > .timeWindowAll(w)
> > .reduce(...)
> >
> > Maybe Aljoscha could jump in here :D
> >
> > Cheers,
> > Gyula
> >
> > Fabian Hueske > ezt írta
> > (időpont: 2015. nov. 23., H, 11:21):
> >
> > If you set the key to the time attribute, the "old" key is no longer
> > valid.
> > The streams are organized by time and only one aggregate for each
> > window-time should be computed.
> >
> > This should do what you are looking for:
> >
> > DataStream
> >   .keyBy(_._1) // key by orginal key
> >   .timeWindow(..)
> >   .apply(...)  // extract window end time: (origKey, time, agg)
> >   .keyBy(_._2) // key by time field
> >   .maxBy(_._3) // value with max agg field
> >
> > Best, Fabian
> >
> > 2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> >  >>:
> >
> > Hi Fabian,
> >
> > thanks for your answer. Yes, that's what I want.
> >
> > The solution you suggest is what I am doing right now (see last
> > of the
> > bullet point in my question).
> >
> > But given your example. I would expect the following output:
> >
> > (key: 1, w-time: 10, agg: 17)
> > (key: 2, w-time: 10, agg: 20)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 1, w-time: 20, agg: 30)
> >
> > Because the reduce function is evaluated for every incoming
> > event (i.e.
> > each key), right?
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 23.11.2015 10:47, Fabian Hueske wrote:
> > > Hi Konstantin,
> > >
> > > let me first summarize to make sure I understood what you are
> looking for.
> > > You computed an aggregate over a keyed event-time window and
> you are
> > > looking for the maximum aggregate for each group of windows
> over the
> > > same period of time.
> > > So if you have
> > > (key: 1, w-time: 10, agg: 17)
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 2, w-time: 20, agg: 28)
> > > (key: 3, w-time: 20, agg: 5)
> > >
> > > you would like to get:
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > >
> > > If this is correct, you can do this as follows.
> > > You can extract the window start and end time from the
> TimeWindow
> > > parameter of the WindowFunction and key the stream either by
> start or
> > > end time and apply a ReduceFunction on the keyed stream.
> > >
> > > Best, Fabian
> > >
> > > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <
> konstantin.kn...@tngtech.com 
> > >  > >>:
> > >
> > > Hi everyone,
> > >
> > > me again :) Let's say you have a stream, and for every
> > window and key
> > > you compute some aggregate value, like this:
> > >
> > > DataStream.keyBy(..)
> > >   .timeWindow(..)
> > >   .apply(...)
> > >
> > >
> > > Now I want to get the maximum aggregate value for every
> > window over the
> > > keys. This feels like a pretty natural use case. How can I
> > achieve this
> > > 

Re: YARN High Availability

2015-11-23 Thread Ufuk Celebi
Hey Gwenhaël,

the restarting jobs are most likely old job submissions. They are not cleaned 
up when you shut down the cluster, but only when they finish (either regular 
finish or after cancelling).

The workaround is to use the command line frontend:

bin/flink cancel JOBID

for each RESTARTING job. Sorry about the inconvenience!

We are in an active discussion about addressing this. The future behaviour will 
be that the startup or shutdown of a cluster cleans up everything and an option 
to skip this step.

The reasoning for the initial solution (not removing anything) was to make sure 
that no jobs are deleted by accident. But it looks like this is more confusing 
than helpful.

– Ufuk

> On 23 Nov 2015, at 11:45, Gwenhael Pasquiers 
>  wrote:
> 
> Hi again !
> 
> On the same topic I'm still trying to start my streaming job with HA.
> The HA part seems to be more or less OK (I killed the JobManager and it came 
> back), however I have an issue with the TaskManagers.
> I configured my job to have only one TaskManager and 1 slot that does 
> [source=>map=>sink].
> The issue I'm encountering is that other instances of my job appear and are 
> in the RESTARTING status since there is only one task slot.
> 
> Do you know of this, or have an idea of where to look in order to understand 
> what's happening ?
> 
> B.R.
> 
> Gwenhaël PASQUIERS
> 
> -Original Message-
> From: Maximilian Michels [mailto:m...@apache.org] 
> Sent: jeudi 19 novembre 2015 13:36
> To: user@flink.apache.org
> Subject: Re: YARN High Availability
> 
> The docs have been updated.
> 
> On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi  wrote:
>> I’ve added a note about this to the docs and asked Max to trigger a new 
>> build of them.
>> 
>> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for 
>> configuring the root path.
>> 
>> In any case, it is orthogonal to Till’s proposals. That one we need to 
>> address as well (see FLINK-2929). The motivation for the current behaviour 
>> was to be rather defensive when removing state in order to not loose data 
>> accidentally. But it can be confusing, indeed.
>> 
>> – Ufuk
>> 
>>> On 19 Nov 2015, at 12:08, Till Rohrmann  wrote:
>>> 
>>> You mean an additional start-up parameter for the `start-cluster.sh` script 
>>> for the HA case? That could work.
>>> 
>>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek  
>>> wrote:
>>> Maybe we could add a user parameter to specify a cluster name that is used 
>>> to make the paths unique.
>>> 
>>> 
>>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann  wrote:
>>> I agree that this would make the configuration easier. However, it entails 
>>> also that the user has to retrieve the randomized path from the logs if he 
>>> wants to restart jobs after the cluster has crashed or intentionally 
>>> restarted. Furthermore, the system won't be able to clean up old checkpoint 
>>> and job handles in case that the cluster stop was intentional.
>>> 
>>> Thus, the question is how do we define the behaviour in order to retrieve 
>>> handles and to clean up old handles so that ZooKeeper won't be cluttered 
>>> with old handles?
>>> 
>>> There are basically two modes:
>>> 
>>> 1. Keep state handles when shutting down the cluster. Provide a mean to 
>>> define a fixed path when starting the cluster and also a mean to purge old 
>>> state handles. Furthermore, add a shutdown mode where the handles under the 
>>> current path are directly removed. This mode would guarantee to always have 
>>> the state handles available if not explicitly told differently. However, 
>>> the downside is that ZooKeeper will be cluttered most certainly.
>>> 
>>> 2. Remove the state handles when shutting down the cluster. Provide a 
>>> shutdown mode where we keep the state handles. This will keep ZooKeeper 
>>> clean but will give you also the possibility to keep a checkpoint around if 
>>> necessary. However, the user is more likely to lose his state when shutting 
>>> down the cluster.
>>> 
>>> On Thu, Nov 19, 2015 at 10:55 AM, Robert Metzger  
>>> wrote:
>>> I agree with Aljoscha. Many companies install Flink (and its config) in a 
>>> central directory and users share that installation.
>>> 
>>> On Thu, Nov 19, 2015 at 10:45 AM, Aljoscha Krettek  
>>> wrote:
>>> I think we should find a way to randomize the paths where the HA stuff 
>>> stores data. If users don’t realize that they store data in the same paths 
>>> this could lead to problems.
>>> 
 On 19 Nov 2015, at 08:50, Till Rohrmann  wrote:
 
 Hi Gwenhaël,
 
 good to hear that you could resolve the problem.
 
 When you run multiple HA flink jobs in the same cluster, then you don’t 
 have to adjust the configuration of Flink. It should work out of the box.
 
 However, if you run multiple HA Flink 

Re: Flink Streaming Core 0.10 in maven repos

2015-11-23 Thread Ufuk Celebi
There is also this guide:
https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.9.x+to+0.10.x

On Monday, 23 November 2015, Stephan Ewen  wrote:

> Hi Arnaud!
>
> In 0.10 , we renamed the dependency to "flink-streaming-java" (and
> flink-streaming-scala"), to be more in line with the structure of the
> dependencies on the batch side.
>
> Just replace "flink-streaming-core" with "flink-streaming-java"...
>
> Greetings,
> Stephan
>
>
> On Mon, Nov 23, 2015 at 9:07 AM, LINZ, Arnaud  > wrote:
>
>> Hello,
>>
>> Small question: I can't find the Streaming Core component in 0.10 version
>> in the maven repo :
>> http://mvnrepository.com/artifact/org.apache.flink/flink-streaming-core
>>
>> Thus in my pom file this artifact is the only part of my Flink's
>> dependencies to stay in 0.10-SNAPSHOT version.
>> Is there something wrong with that component's publication in 0.10
>> version ?
>>
>> Greetings,
>> Arnaud
>>
>>
>> 
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>
>


Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Matthias J. Sax
Hi,

Can't you use a second keyed window (with the same size) and apply
.max(...)?

-Matthias

On 11/23/2015 11:00 AM, Konstantin Knauf wrote:
> Hi Fabian,
> 
> thanks for your answer. Yes, that's what I want.
> 
> The solution you suggest is what I am doing right now (see last of the
> bullet point in my question).
> 
> But given your example. I would expect the following output:
> 
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
> 
> Because the reduce function is evaluated for every incoming event (i.e.
> each key), right?
> 
> Cheers,
> 
> Konstantin
> 
> On 23.11.2015 10:47, Fabian Hueske wrote:
>> Hi Konstantin,
>>
>> let me first summarize to make sure I understood what you are looking for.
>> You computed an aggregate over a keyed event-time window and you are
>> looking for the maximum aggregate for each group of windows over the
>> same period of time.
>> So if you have
>> (key: 1, w-time: 10, agg: 17)
>> (key: 2, w-time: 10, agg: 20)
>> (key: 1, w-time: 20, agg: 30)
>> (key: 2, w-time: 20, agg: 28)
>> (key: 3, w-time: 20, agg: 5)
>>
>> you would like to get:
>> (key: 2, w-time: 10, agg: 20)
>> (key: 1, w-time: 20, agg: 30)
>>
>> If this is correct, you can do this as follows.
>> You can extract the window start and end time from the TimeWindow
>> parameter of the WindowFunction and key the stream either by start or
>> end time and apply a ReduceFunction on the keyed stream.
>>
>> Best, Fabian
>>
>> 2015-11-23 8:41 GMT+01:00 Konstantin Knauf > >:
>>
>> Hi everyone,
>>
>> me again :) Let's say you have a stream, and for every window and key
>> you compute some aggregate value, like this:
>>
>> DataStream.keyBy(..)
>>   .timeWindow(..)
>>   .apply(...)
>>
>>
>> Now I want to get the maximum aggregate value for every window over the
>> keys. This feels like a pretty natural use case. How can I achieve this
>> with Flink in the most compact way?
>>
>> The options I thought of so far are:
>>
>> * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
>> would not be distributed by keys anymore.
>>
>> * use a windowAll after the WindowFunction to create windows of the
>> aggregates, which originated from the same timeWindow. This could be
>> done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
>> Drawback: Seems unnecessarily complicated and doubles the latency (at
>> least in my naive implementation ;)).
>>
>> * Of course, you could also just keyBy the start time of the window
>> after the WindowFunction, but then you get more than one event for each
>> window.
>>
>> Is there some easy way I am missing? If not, is there a technical
>> reasons, why such an "reduceByKeyAndWindow"-operator is not available in
>> Flink?
>>
>> Cheers,
>>
>> Konstantin
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Stephan Ewen
One addition: You can set the system to use "ingestion time", which gives
you event time with auto-generated timestamps and watermarks, based on the
time that the events are seen in the sources.

That way you have the same simplicity as processing time, and you get the
window alignment that Aljoscha described (second total max window has the
same elements as initial max-per-key window).

On Mon, Nov 23, 2015 at 12:49 PM, Aljoscha Krettek 
wrote:

> Hi,
> @Konstantin: are you using event-time or processing-time windows. If you
> are using processing time, then you can only do it the way Fabian
> suggested. The problem here is, however, that the .keyBy().reduce()
> combination would emit a new maximum for every element that arrives there
> and you never know when you saw the final element, i.e. the maximum.
>
> If you are using event-time, then you are indeed lucky because then you
> can use what Gyula suggested and you won’t have latency, if I’m correct.
> The reason is that the watermark that flushes out the windows in the first
> (keyed window) will also flush out the elements in the all-window. So the
> keyed window will do computations, send along the elements and then after
> it is done it will forward the watermark. This watermark will immediately
> trigger computation of the all-window for the same time period.
>
> Cheers,
> Aljoscha
> > On 23 Nov 2015, at 11:51, Gyula Fóra  wrote:
> >
> > Yes, you are right I think we should have some nice abstractions for
> doing this.
> >
> > Before the rewrite of the windowing runtime to support out-of-order
> events,  we had abstractions for supporting this but that code was not
> feasible from performance perspective.  (The result of a keyed window
> reduce used to be a window containing all the aggregates and one could then
> just aggregate again on the result without specifying the window again)
> >
> > Maybe we could implement similar abstractions on the new window runtime,
> I think that would be really awesome.
> >
> > Gyula
> >
> > Konstantin Knauf  ezt írta (időpont:
> 2015. nov. 23., H, 11:40):
> > Thanks!
> >
> > @Fabian: Yepp, but this still results in multiple outputs per window,
> > because the maximum is emitted for every key.
> >
> > @Gyula: Yepp, that's the second bullet point from my question ;) The way
> > I implemented it, it basically doubles the latency, because the
> > timeWindowAll has to wait for the next timeWindow before it can close
> > the previous one. So if the first timeWindow is 10s, it takes 20s until
> > you have a result, although it cant change after 10s. You know what I
> mean?
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 23.11.2015 11:32, Gyula Fóra wrote:
> > > Hi,
> > >
> > > Alright it seems there are multiple ways of doing this.
> > >
> > > I would do something like:
> > >
> > > ds.keyBy(key)
> > > .timeWindow(w)
> > > .reduce(...)
> > > .timeWindowAll(w)
> > > .reduce(...)
> > >
> > > Maybe Aljoscha could jump in here :D
> > >
> > > Cheers,
> > > Gyula
> > >
> > > Fabian Hueske > ezt írta
> > > (időpont: 2015. nov. 23., H, 11:21):
> > >
> > > If you set the key to the time attribute, the "old" key is no
> longer
> > > valid.
> > > The streams are organized by time and only one aggregate for each
> > > window-time should be computed.
> > >
> > > This should do what you are looking for:
> > >
> > > DataStream
> > >   .keyBy(_._1) // key by orginal key
> > >   .timeWindow(..)
> > >   .apply(...)  // extract window end time: (origKey, time, agg)
> > >   .keyBy(_._2) // key by time field
> > >   .maxBy(_._3) // value with max agg field
> > >
> > > Best, Fabian
> > >
> > > 2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> > >  >>:
> > >
> > > Hi Fabian,
> > >
> > > thanks for your answer. Yes, that's what I want.
> > >
> > > The solution you suggest is what I am doing right now (see last
> > > of the
> > > bullet point in my question).
> > >
> > > But given your example. I would expect the following output:
> > >
> > > (key: 1, w-time: 10, agg: 17)
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 1, w-time: 20, agg: 30)
> > >
> > > Because the reduce function is evaluated for every incoming
> > > event (i.e.
> > > each key), right?
> > >
> > > Cheers,
> > >
> > > Konstantin
> > >
> > > On 23.11.2015 10:47, Fabian Hueske wrote:
> > > > Hi Konstantin,
> > > >
> > > > let me first summarize to make sure I understood what you
> are looking for.
> > > > You computed an aggregate over a keyed event-time window and
> you are
> > > > looking for the maximum 

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Aljoscha Krettek
Hi,
@Konstantin: are you using event-time or processing-time windows. If you are 
using processing time, then you can only do it the way Fabian suggested. The 
problem here is, however, that the .keyBy().reduce() combination would emit a 
new maximum for every element that arrives there and you never know when you 
saw the final element, i.e. the maximum.

If you are using event-time, then you are indeed lucky because then you can use 
what Gyula suggested and you won’t have latency, if I’m correct. The reason is 
that the watermark that flushes out the windows in the first (keyed window) 
will also flush out the elements in the all-window. So the keyed window will do 
computations, send along the elements and then after it is done it will forward 
the watermark. This watermark will immediately trigger computation of the 
all-window for the same time period.

Cheers,
Aljoscha
> On 23 Nov 2015, at 11:51, Gyula Fóra  wrote:
> 
> Yes, you are right I think we should have some nice abstractions for doing 
> this. 
> 
> Before the rewrite of the windowing runtime to support out-of-order events,  
> we had abstractions for supporting this but that code was not feasible from 
> performance perspective.  (The result of a keyed window reduce used to be a 
> window containing all the aggregates and one could then just aggregate again 
> on the result without specifying the window again)
> 
> Maybe we could implement similar abstractions on the new window runtime, I 
> think that would be really awesome.
> 
> Gyula
> 
> Konstantin Knauf  ezt írta (időpont: 2015. nov. 
> 23., H, 11:40):
> Thanks!
> 
> @Fabian: Yepp, but this still results in multiple outputs per window,
> because the maximum is emitted for every key.
> 
> @Gyula: Yepp, that's the second bullet point from my question ;) The way
> I implemented it, it basically doubles the latency, because the
> timeWindowAll has to wait for the next timeWindow before it can close
> the previous one. So if the first timeWindow is 10s, it takes 20s until
> you have a result, although it cant change after 10s. You know what I mean?
> 
> Cheers,
> 
> Konstantin
> 
> On 23.11.2015 11:32, Gyula Fóra wrote:
> > Hi,
> >
> > Alright it seems there are multiple ways of doing this.
> >
> > I would do something like:
> >
> > ds.keyBy(key)
> > .timeWindow(w)
> > .reduce(...)
> > .timeWindowAll(w)
> > .reduce(...)
> >
> > Maybe Aljoscha could jump in here :D
> >
> > Cheers,
> > Gyula
> >
> > Fabian Hueske > ezt írta
> > (időpont: 2015. nov. 23., H, 11:21):
> >
> > If you set the key to the time attribute, the "old" key is no longer
> > valid.
> > The streams are organized by time and only one aggregate for each
> > window-time should be computed.
> >
> > This should do what you are looking for:
> >
> > DataStream
> >   .keyBy(_._1) // key by orginal key
> >   .timeWindow(..)
> >   .apply(...)  // extract window end time: (origKey, time, agg)
> >   .keyBy(_._2) // key by time field
> >   .maxBy(_._3) // value with max agg field
> >
> > Best, Fabian
> >
> > 2015-11-23 11:00 GMT+01:00 Konstantin Knauf
> > >:
> >
> > Hi Fabian,
> >
> > thanks for your answer. Yes, that's what I want.
> >
> > The solution you suggest is what I am doing right now (see last
> > of the
> > bullet point in my question).
> >
> > But given your example. I would expect the following output:
> >
> > (key: 1, w-time: 10, agg: 17)
> > (key: 2, w-time: 10, agg: 20)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 1, w-time: 20, agg: 30)
> >
> > Because the reduce function is evaluated for every incoming
> > event (i.e.
> > each key), right?
> >
> > Cheers,
> >
> > Konstantin
> >
> > On 23.11.2015 10:47, Fabian Hueske wrote:
> > > Hi Konstantin,
> > >
> > > let me first summarize to make sure I understood what you are 
> > looking for.
> > > You computed an aggregate over a keyed event-time window and you 
> > are
> > > looking for the maximum aggregate for each group of windows over 
> > the
> > > same period of time.
> > > So if you have
> > > (key: 1, w-time: 10, agg: 17)
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > > (key: 2, w-time: 20, agg: 28)
> > > (key: 3, w-time: 20, agg: 5)
> > >
> > > you would like to get:
> > > (key: 2, w-time: 10, agg: 20)
> > > (key: 1, w-time: 20, agg: 30)
> > >
> > > If this is correct, you can do this as follows.
> > > You can extract the window start and end time from the TimeWindow
> > > parameter of the 

[VOTE] Release Apache Flink 0.10.1 (release-0.10.0-rc1)

2015-11-23 Thread Robert Metzger
Hi All,

this is the first bugfix release for the 0.10 series of Flink.
I've CC'ed the user@ list if users are interested in helping to verify the
release.

It contains fixes for critical issues, in particular:
- FLINK-3021 Fix class loading issue for streaming sources
- FLINK-2974 Add periodic offset committer for Kafka
- FLINK-2977 Using reflection to load HBase Kerberos tokens
- FLINK-3024 Fix TimestampExtractor.getCurrentWatermark() Behaviour
- FLINK-2967 Increase timeout for LOCAL_HOST address detection stratey
- FLINK-3025 [kafka consumer] Bump transitive ZkClient dependency
- FLINK-2989 job cancel button doesn't work on YARN
- FLINK-3032: Flink does not start on Hadoop 2.7.1 (HDP), due to class
conflict
- FLINK-3011, 3019, 3028 Cancel jobs in RESTARTING state

This is the guide on how to verify a release:
https://cwiki.apache.org/confluence/display/FLINK/Releasing

During the testing, please focus on trying out Flink on different Hadoop
platforms: We changed the way how Hadoop's Maven dependencies are packaged,
so maybe there are issues with different Hadoop distributions.
The Kafka consumer also changed a bit, would be good to test it on a
cluster.

-

Please vote on releasing the following candidate as Apache Flink version
0.10.1:

The commit to be voted on:
http://git-wip-us.apache.org/repos/asf/flink/commit/2e9b2316

Branch:
release-0.10.1-rc1 (see
https://git1-us-west.apache.org/repos/asf/flink/?p=flink.git)

The release artifacts to be voted on can be found at:
http://people.apache.org/~rmetzger/flink-0.10.1-rc1/

The release artifacts are signed with the key with fingerprint  D9839159:
http://www.apache.org/dist/flink/KEYS

The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapacheflink-1058

-

The vote is open for the next 72 hours and passes if a majority of at least
three +1 PMC votes are cast.

The vote ends on Wednesday, November 25.

[ ] +1 Release this package as Apache Flink 0.10.1
[ ] -1 Do not release this package because ...

===


RE: YARN High Availability

2015-11-23 Thread Gwenhael Pasquiers
OK, I understand.

Maybe we are not really using flink as you intended. The way we are using it, 
one cluster equals one job. That way we are sure to isolate the different jobs 
as much as possible and in case of crashes / bugs / (etc) can completely kill 
one cluster without interfering with the other jobs.

That future behavior seems good :-)

Instead of the manual flink commands, is there to manually delete those old 
jobs before launching my job ? They probably are somewhere in hdfs, aren't they 
?

B.R.


-Original Message-
From: Ufuk Celebi [mailto:u...@apache.org] 
Sent: lundi 23 novembre 2015 12:12
To: user@flink.apache.org
Subject: Re: YARN High Availability

Hey Gwenhaël,

the restarting jobs are most likely old job submissions. They are not cleaned 
up when you shut down the cluster, but only when they finish (either regular 
finish or after cancelling).

The workaround is to use the command line frontend:

bin/flink cancel JOBID

for each RESTARTING job. Sorry about the inconvenience!

We are in an active discussion about addressing this. The future behaviour will 
be that the startup or shutdown of a cluster cleans up everything and an option 
to skip this step.

The reasoning for the initial solution (not removing anything) was to make sure 
that no jobs are deleted by accident. But it looks like this is more confusing 
than helpful.

– Ufuk

> On 23 Nov 2015, at 11:45, Gwenhael Pasquiers 
>  wrote:
> 
> Hi again !
> 
> On the same topic I'm still trying to start my streaming job with HA.
> The HA part seems to be more or less OK (I killed the JobManager and it came 
> back), however I have an issue with the TaskManagers.
> I configured my job to have only one TaskManager and 1 slot that does 
> [source=>map=>sink].
> The issue I'm encountering is that other instances of my job appear and are 
> in the RESTARTING status since there is only one task slot.
> 
> Do you know of this, or have an idea of where to look in order to understand 
> what's happening ?
> 
> B.R.
> 
> Gwenhaël PASQUIERS
> 
> -Original Message-
> From: Maximilian Michels [mailto:m...@apache.org] 
> Sent: jeudi 19 novembre 2015 13:36
> To: user@flink.apache.org
> Subject: Re: YARN High Availability
> 
> The docs have been updated.
> 
> On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi  wrote:
>> I’ve added a note about this to the docs and asked Max to trigger a new 
>> build of them.
>> 
>> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for 
>> configuring the root path.
>> 
>> In any case, it is orthogonal to Till’s proposals. That one we need to 
>> address as well (see FLINK-2929). The motivation for the current behaviour 
>> was to be rather defensive when removing state in order to not loose data 
>> accidentally. But it can be confusing, indeed.
>> 
>> – Ufuk
>> 
>>> On 19 Nov 2015, at 12:08, Till Rohrmann  wrote:
>>> 
>>> You mean an additional start-up parameter for the `start-cluster.sh` script 
>>> for the HA case? That could work.
>>> 
>>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek  
>>> wrote:
>>> Maybe we could add a user parameter to specify a cluster name that is used 
>>> to make the paths unique.
>>> 
>>> 
>>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann  wrote:
>>> I agree that this would make the configuration easier. However, it entails 
>>> also that the user has to retrieve the randomized path from the logs if he 
>>> wants to restart jobs after the cluster has crashed or intentionally 
>>> restarted. Furthermore, the system won't be able to clean up old checkpoint 
>>> and job handles in case that the cluster stop was intentional.
>>> 
>>> Thus, the question is how do we define the behaviour in order to retrieve 
>>> handles and to clean up old handles so that ZooKeeper won't be cluttered 
>>> with old handles?
>>> 
>>> There are basically two modes:
>>> 
>>> 1. Keep state handles when shutting down the cluster. Provide a mean to 
>>> define a fixed path when starting the cluster and also a mean to purge old 
>>> state handles. Furthermore, add a shutdown mode where the handles under the 
>>> current path are directly removed. This mode would guarantee to always have 
>>> the state handles available if not explicitly told differently. However, 
>>> the downside is that ZooKeeper will be cluttered most certainly.
>>> 
>>> 2. Remove the state handles when shutting down the cluster. Provide a 
>>> shutdown mode where we keep the state handles. This will keep ZooKeeper 
>>> clean but will give you also the possibility to keep a checkpoint around if 
>>> necessary. However, the user is more likely to lose his state when shutting 
>>> down the cluster.
>>> 
>>> On Thu, Nov 19, 2015 at 10:55 AM, Robert Metzger  
>>> wrote:
>>> I agree with Aljoscha. Many companies install Flink (and its config) in a 
>>> central directory and users share 

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Fabian Hueske
If you set the key to the time attribute, the "old" key is no longer valid.
The streams are organized by time and only one aggregate for each
window-time should be computed.

This should do what you are looking for:

DataStream
  .keyBy(_._1) // key by orginal key
  .timeWindow(..)
  .apply(...)  // extract window end time: (origKey, time, agg)
  .keyBy(_._2) // key by time field
  .maxBy(_._3) // value with max agg field

Best, Fabian

2015-11-23 11:00 GMT+01:00 Konstantin Knauf :

> Hi Fabian,
>
> thanks for your answer. Yes, that's what I want.
>
> The solution you suggest is what I am doing right now (see last of the
> bullet point in my question).
>
> But given your example. I would expect the following output:
>
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
> (key: 1, w-time: 20, agg: 30)
>
> Because the reduce function is evaluated for every incoming event (i.e.
> each key), right?
>
> Cheers,
>
> Konstantin
>
> On 23.11.2015 10:47, Fabian Hueske wrote:
> > Hi Konstantin,
> >
> > let me first summarize to make sure I understood what you are looking
> for.
> > You computed an aggregate over a keyed event-time window and you are
> > looking for the maximum aggregate for each group of windows over the
> > same period of time.
> > So if you have
> > (key: 1, w-time: 10, agg: 17)
> > (key: 2, w-time: 10, agg: 20)
> > (key: 1, w-time: 20, agg: 30)
> > (key: 2, w-time: 20, agg: 28)
> > (key: 3, w-time: 20, agg: 5)
> >
> > you would like to get:
> > (key: 2, w-time: 10, agg: 20)
> > (key: 1, w-time: 20, agg: 30)
> >
> > If this is correct, you can do this as follows.
> > You can extract the window start and end time from the TimeWindow
> > parameter of the WindowFunction and key the stream either by start or
> > end time and apply a ReduceFunction on the keyed stream.
> >
> > Best, Fabian
> >
> > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf  > >:
> >
> > Hi everyone,
> >
> > me again :) Let's say you have a stream, and for every window and key
> > you compute some aggregate value, like this:
> >
> > DataStream.keyBy(..)
> >   .timeWindow(..)
> >   .apply(...)
> >
> >
> > Now I want to get the maximum aggregate value for every window over
> the
> > keys. This feels like a pretty natural use case. How can I achieve
> this
> > with Flink in the most compact way?
> >
> > The options I thought of so far are:
> >
> > * Use an allTimeWindow, obviously. Drawback is, that the
> WindowFunction
> > would not be distributed by keys anymore.
> >
> > * use a windowAll after the WindowFunction to create windows of the
> > aggregates, which originated from the same timeWindow. This could be
> > done either with a TimeWindow or with a GlobalWindow with
> DeltaTrigger.
> > Drawback: Seems unnecessarily complicated and doubles the latency (at
> > least in my naive implementation ;)).
> >
> > * Of course, you could also just keyBy the start time of the window
> > after the WindowFunction, but then you get more than one event for
> each
> > window.
> >
> > Is there some easy way I am missing? If not, is there a technical
> > reasons, why such an "reduceByKeyAndWindow"-operator is not
> available in
> > Flink?
> >
> > Cheers,
> >
> > Konstantin
> >
> >
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Re: How to pass hdp.version to flink on yarn

2015-11-23 Thread Robert Metzger
Hi,

In Flink the configuration parameter for passing custom JVM options is
"env.java.opts". I would recommend to put it into the
conf/flink-config.yaml like this:

env.java.opts: "-Dhdp.version=2.3.0.0-2557 -Dhdp.version=2.3.0.0-2557"

Please let me know if this works.
Maybe you are the first user running Flink on Pivotal HDP and there are
some things different to other Hadoop distributions.

Regards,
Robert




On Mon, Nov 23, 2015 at 1:15 AM, Jagat Singh  wrote:

> Hi,
>
> I am running example Flink program (Pivotal HDP)
>
> ./bin/flink run -m yarn-cluster -yn 2 ./examples/WordCount.jar
>
> I am getting error below.
>
> How to pass the stack.name and stack.version to the flink program.
>
> This is similar to what we give to Spark as
>
> hdp.version.
>
> Example
>
> spark.driver.extraJavaOptions-Dhdp.version=2.3.0.0-2557
> spark.yarn.am.extraJavaOptions   -Dhdp.version=2.3.0.0-2557
>
> Thanks
>
> Exception message:
> /grid/0/hadoop/yarn/local/usercache/d760770/appcache/application_1447977375774_17024/container_e34_1447977375774_17024_01_01/launch_container.sh:
> line 26: $PWD/*:$HADOOP_CONF_DIR:/usr/${stack.name
> }/current/hadoop-client/*:/usr/${stack.name
> }/current/hadoop-client/lib/*:/usr/${stack.name
> }/current/hadoop-hdfs-client/*:/usr/${stack.name
> }/current/hadoop-hdfs-client/lib/*:/usr/${stack.name
> }/current/hadoop-yarn-client/*:/usr/*${stack.name 
> }*/current/hadoop-yarn-client/lib/*:
> bad substitution
>
> Stack trace: ExitCodeException exitCode=1:
> /grid/0/hadoop/yarn/local/usercache/d760770/appcache/application_1447977375774_17024/container_e34_1447977375774_17024_01_01/launch_container.sh:
> line 26: $PWD/*:$HADOOP_CONF_DIR:/usr/${stack.name
> }/current/hadoop-client/*:/usr/${stack.name
> }/current/hadoop-client/lib/*:/usr/${stack.name
> }/current/hadoop-hdfs-client/*:/usr/${stack.name
> }/current/hadoop-hdfs-client/lib/*:/usr/${stack.name
> }/current/hadoop-yarn-client/*:/usr/*${stack.name 
> }*/current/hadoop-yarn-client/lib/*:
> bad substitution
>
>
>


Re: YARN High Availability

2015-11-23 Thread Till Rohrmann
The problem is the execution graph handle which is stored in ZooKeeper. You
can manually remove it via the ZooKeeper shell by simply deleting
everything below your `recovery.zookeeper.path.root` ZNode. But you should
be sure that the cluster has been stopped before.

Do you start the different clusters with different
`recovery.zookeeper.path.root` values? If not, then you should run into
troubles when running multiple clusters at the same time. The reason is
that then all clusters will think that they belong together.

Cheers,
Till

On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> OK, I understand.
>
> Maybe we are not really using flink as you intended. The way we are using
> it, one cluster equals one job. That way we are sure to isolate the
> different jobs as much as possible and in case of crashes / bugs / (etc)
> can completely kill one cluster without interfering with the other jobs.
>
> That future behavior seems good :-)
>
> Instead of the manual flink commands, is there to manually delete those
> old jobs before launching my job ? They probably are somewhere in hdfs,
> aren't they ?
>
> B.R.
>
>
> -Original Message-
> From: Ufuk Celebi [mailto:u...@apache.org]
> Sent: lundi 23 novembre 2015 12:12
> To: user@flink.apache.org
> Subject: Re: YARN High Availability
>
> Hey Gwenhaël,
>
> the restarting jobs are most likely old job submissions. They are not
> cleaned up when you shut down the cluster, but only when they finish
> (either regular finish or after cancelling).
>
> The workaround is to use the command line frontend:
>
> bin/flink cancel JOBID
>
> for each RESTARTING job. Sorry about the inconvenience!
>
> We are in an active discussion about addressing this. The future behaviour
> will be that the startup or shutdown of a cluster cleans up everything and
> an option to skip this step.
>
> The reasoning for the initial solution (not removing anything) was to make
> sure that no jobs are deleted by accident. But it looks like this is more
> confusing than helpful.
>
> – Ufuk
>
> > On 23 Nov 2015, at 11:45, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
> >
> > Hi again !
> >
> > On the same topic I'm still trying to start my streaming job with HA.
> > The HA part seems to be more or less OK (I killed the JobManager and it
> came back), however I have an issue with the TaskManagers.
> > I configured my job to have only one TaskManager and 1 slot that does
> [source=>map=>sink].
> > The issue I'm encountering is that other instances of my job appear and
> are in the RESTARTING status since there is only one task slot.
> >
> > Do you know of this, or have an idea of where to look in order to
> understand what's happening ?
> >
> > B.R.
> >
> > Gwenhaël PASQUIERS
> >
> > -Original Message-
> > From: Maximilian Michels [mailto:m...@apache.org]
> > Sent: jeudi 19 novembre 2015 13:36
> > To: user@flink.apache.org
> > Subject: Re: YARN High Availability
> >
> > The docs have been updated.
> >
> > On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi  wrote:
> >> I’ve added a note about this to the docs and asked Max to trigger a new
> build of them.
> >>
> >> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for
> configuring the root path.
> >>
> >> In any case, it is orthogonal to Till’s proposals. That one we need to
> address as well (see FLINK-2929). The motivation for the current behaviour
> was to be rather defensive when removing state in order to not loose data
> accidentally. But it can be confusing, indeed.
> >>
> >> – Ufuk
> >>
> >>> On 19 Nov 2015, at 12:08, Till Rohrmann  wrote:
> >>>
> >>> You mean an additional start-up parameter for the `start-cluster.sh`
> script for the HA case? That could work.
> >>>
> >>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek <
> aljos...@apache.org> wrote:
> >>> Maybe we could add a user parameter to specify a cluster name that is
> used to make the paths unique.
> >>>
> >>>
> >>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann 
> wrote:
> >>> I agree that this would make the configuration easier. However, it
> entails also that the user has to retrieve the randomized path from the
> logs if he wants to restart jobs after the cluster has crashed or
> intentionally restarted. Furthermore, the system won't be able to clean up
> old checkpoint and job handles in case that the cluster stop was
> intentional.
> >>>
> >>> Thus, the question is how do we define the behaviour in order to
> retrieve handles and to clean up old handles so that ZooKeeper won't be
> cluttered with old handles?
> >>>
> >>> There are basically two modes:
> >>>
> >>> 1. Keep state handles when shutting down the cluster. Provide a mean
> to define a fixed path when starting the cluster and also a mean to purge
> old state handles. Furthermore, add a shutdown mode where the handles under
> the current path are directly removed. This mode 

Cancel Streaming Job

2015-11-23 Thread Welly Tambunan
Hi All,

Is there any way to stop/cancel the job that's restarting ?

I have already stop the cluster and start it again but seems it's still
restarting in dashboard.
I also try to cancel the job via CLI by running bin/flink cancel 
but it's not working.



Cheers


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Cancel Streaming Job

2015-11-23 Thread Welly Tambunan
Hi All,

Finally i've found the solution for killing the job manager.

https://flink.apache.org/faq.html#i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do


But i do really hope that we have that cancel button for restarting job.


Cheers

On Tue, Nov 24, 2015 at 8:30 AM, Welly Tambunan  wrote:

> Hi All,
>
> Is there any way to stop/cancel the job that's restarting ?
>
> I have already stop the cluster and start it again but seems it's still
> restarting in dashboard.
> I also try to cancel the job via CLI by running bin/flink cancel 
> but it's not working.
>
>
>
> Cheers
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>



-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Running Flink in Cloudfoundry Environment

2015-11-23 Thread Madhukar Thota
Hi

Is it possible to run Flink in Cloudfoundry Environment? if yes, How can we
achive this?

Any help is appreciated. Thanks in Advance.

Thanks,
Madhu


Re: Cancel Streaming Job

2015-11-23 Thread Gyula Fóra
Hi!

This issue has been fixed very recently and the fix will go into the
upcoming bugfix release. (0.10.1)

Should be out in the next few days :)

Cheers
Gyula
On Tue, Nov 24, 2015 at 4:49 AM Welly Tambunan  wrote:

> Hi All,
>
> Finally i've found the solution for killing the job manager.
>
>
> https://flink.apache.org/faq.html#i-cant-stop-flink-with-the-provided-stop-scripts-what-can-i-do
>
>
> But i do really hope that we have that cancel button for restarting job.
>
>
> Cheers
>
> On Tue, Nov 24, 2015 at 8:30 AM, Welly Tambunan  wrote:
>
>> Hi All,
>>
>> Is there any way to stop/cancel the job that's restarting ?
>>
>> I have already stop the cluster and start it again but seems it's still
>> restarting in dashboard.
>> I also try to cancel the job via CLI by running bin/flink cancel 
>> but it's not working.
>>
>>
>>
>> Cheers
>>
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


RE: YARN High Availability

2015-11-23 Thread Gwenhael Pasquiers
We are not yet using HA in our cluster instances.
But yes, we will have to change the zookeeper.path.root ☺

We package our jobs with their own config folder (we don’t rely on flink’s 
config folder); we can put the maven project name into this property then they 
will have different values ☺


From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: lundi 23 novembre 2015 14:51
To: user@flink.apache.org
Subject: Re: YARN High Availability

The problem is the execution graph handle which is stored in ZooKeeper. You can 
manually remove it via the ZooKeeper shell by simply deleting everything below 
your `recovery.zookeeper.path.root` ZNode. But you should be sure that the 
cluster has been stopped before.

Do you start the different clusters with different 
`recovery.zookeeper.path.root` values? If not, then you should run into 
troubles when running multiple clusters at the same time. The reason is that 
then all clusters will think that they belong together.

Cheers,
Till

On Mon, Nov 23, 2015 at 2:15 PM, Gwenhael Pasquiers 
> wrote:
OK, I understand.

Maybe we are not really using flink as you intended. The way we are using it, 
one cluster equals one job. That way we are sure to isolate the different jobs 
as much as possible and in case of crashes / bugs / (etc) can completely kill 
one cluster without interfering with the other jobs.

That future behavior seems good :-)

Instead of the manual flink commands, is there to manually delete those old 
jobs before launching my job ? They probably are somewhere in hdfs, aren't they 
?

B.R.


-Original Message-
From: Ufuk Celebi [mailto:u...@apache.org]
Sent: lundi 23 novembre 2015 12:12
To: user@flink.apache.org
Subject: Re: YARN High Availability

Hey Gwenhaël,

the restarting jobs are most likely old job submissions. They are not cleaned 
up when you shut down the cluster, but only when they finish (either regular 
finish or after cancelling).

The workaround is to use the command line frontend:

bin/flink cancel JOBID

for each RESTARTING job. Sorry about the inconvenience!

We are in an active discussion about addressing this. The future behaviour will 
be that the startup or shutdown of a cluster cleans up everything and an option 
to skip this step.

The reasoning for the initial solution (not removing anything) was to make sure 
that no jobs are deleted by accident. But it looks like this is more confusing 
than helpful.

– Ufuk

> On 23 Nov 2015, at 11:45, Gwenhael Pasquiers 
> > 
> wrote:
>
> Hi again !
>
> On the same topic I'm still trying to start my streaming job with HA.
> The HA part seems to be more or less OK (I killed the JobManager and it came 
> back), however I have an issue with the TaskManagers.
> I configured my job to have only one TaskManager and 1 slot that does 
> [source=>map=>sink].
> The issue I'm encountering is that other instances of my job appear and are 
> in the RESTARTING status since there is only one task slot.
>
> Do you know of this, or have an idea of where to look in order to understand 
> what's happening ?
>
> B.R.
>
> Gwenhaël PASQUIERS
>
> -Original Message-
> From: Maximilian Michels [mailto:m...@apache.org]
> Sent: jeudi 19 novembre 2015 13:36
> To: user@flink.apache.org
> Subject: Re: YARN High Availability
>
> The docs have been updated.
>
> On Thu, Nov 19, 2015 at 12:36 PM, Ufuk Celebi 
> > wrote:
>> I’ve added a note about this to the docs and asked Max to trigger a new 
>> build of them.
>>
>> Regarding Aljoscha’s idea: I like it. It is essentially a shortcut for 
>> configuring the root path.
>>
>> In any case, it is orthogonal to Till’s proposals. That one we need to 
>> address as well (see FLINK-2929). The motivation for the current behaviour 
>> was to be rather defensive when removing state in order to not loose data 
>> accidentally. But it can be confusing, indeed.
>>
>> – Ufuk
>>
>>> On 19 Nov 2015, at 12:08, Till Rohrmann 
>>> > wrote:
>>>
>>> You mean an additional start-up parameter for the `start-cluster.sh` script 
>>> for the HA case? That could work.
>>>
>>> On Thu, Nov 19, 2015 at 11:54 AM, Aljoscha Krettek 
>>> > wrote:
>>> Maybe we could add a user parameter to specify a cluster name that is used 
>>> to make the paths unique.
>>>
>>>
>>> On Thu, Nov 19, 2015, 11:24 Till Rohrmann 
>>> > wrote:
>>> I agree that this would make the configuration easier. However, it entails 
>>> also that the user has to retrieve the randomized path from the logs if he 
>>> wants to restart jobs after the cluster has crashed or intentionally 
>>>