Re: Implement Joins with Lookup Data

2018-07-24 Thread Jain, Ankit
How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

Thanks
Ankit

From: Jörn Franke 
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal 
Cc: 
Subject: Re: Implement Joins with Lookup Data

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process.

On 24. Jul 2018, at 05:25, Harshvardhan Agrawal 
mailto:harshvardhan.ag...@gmail.com>> wrote:
Hi,

We are using Flink for financial data enrichment and aggregations. We have 
Positions data that we are currently receiving from Kafka. We want to enrich 
that data with reference data like Product and Account information that is 
present in a relational database. From my understanding of Flink so far I think 
there are two ways to achieve this. Here are two ways to do it:

1) First Approach:
a) Get positions from Kafka and key by product key.
b) Perform lookup from the database for each key and then obtain 
Tuple2

2) Second Approach:
a) Get positions from Kafka and key by product key.
b) Window the keyed stream into say 15 seconds each.
c) For each window get the unique product keys and perform a single lookup.
d) Somehow join Positions and Products

In the first approach we will be making a lot of calls to the DB and the 
solution is very chatty. Its hard to scale this cos the database storing the 
reference data might not be very responsive.

In the second approach, I wish to join the WindowedStream with the 
SingleOutputStream and turns out I can't join a windowed stream. So I am not 
quite sure how to do that.

I wanted an opinion for what is the right thing to do. Should I go with the 
first approach or the second one. If the second one, how can I implement the 
join?

--
Regards,
Harshvardhan Agrawal


Re: Appending Windowed Aggregates to Events

2017-06-26 Thread Jain, Ankit
You could load the historical data as flink state and then look up the state 
with the key derived from input record.
That should serve like a join in relational world.

You may also want to think about keeping the writes and querying isolated.
Especially if your windows are going to be long (eg cash transactions for last 
6 months in your example) and you need your data to be persistent long term, 
having a durable store outside of Flink will really help.

Flink state feature is really nice but I wouldn’t view it as a long term 
durable storage like a no-sql store or a relational db like oracle.

Thanks
Ankit

From: Tim Stearn 
Date: Friday, June 23, 2017 at 3:59 PM
To: "user@flink.apache.org" 
Subject: Appending Windowed Aggregates to Events

Hello All,

I’m *very* new to Flink.  I read through the documentation and played with some 
sample code, but I’m struggling to get started with my requirements.


We want to use Flink to maintain windowed aggregates as part of a transaction 
monitoring application.  These would use sliding window definitions.  An 
example would be:  “Total amount for CASH transactions in the last 5 days”.   
Here’s what I need my Flink application to do:

1.   Prepare for transaction processing by reading historical aggregates 
and building windows

2.   For each new transaction:

a.   Update the windowed aggregate with the new transaction data

b.   Find the window that matches the incoming time stamp and add the 
aggregate value to the transaction

c.   Send enhanced transaction (original fields + aggregates from matching 
window) to downstream processor via RabbitMQ or Kafka sink

For every transaction coming in, I want one (and only one) output that contains 
the original transaction fields plus the aggregates.

I see how to do the code to create the window assigner and the code that 
incrementally maintains the aggregates.  I’m not sure how I could join this 
back to the original transaction record, appending the aggregate values from 
the window that matches the transaction date stamp.  This seems like a join of 
some kind to me, but I don’t know how to implement in in Flink.

I’m hoping someone could reply with some simple code (or even pseudo code) to 
get me started on the “join”  part of the above data flow.  Please let me know 
if I need to clarify.

Thanks,

Tim Stearn



Re: High Availability on Yarn

2017-05-24 Thread Jain, Ankit
Thanks for the reply Robert – I will try out #1 & keep you posted.

From: Robert Metzger <rmetz...@apache.org>
Date: Wednesday, May 24, 2017 at 7:44 AM
To: "Jain, Ankit" <ankit.j...@here.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: High Availability on Yarn

Hi Ankit,

I realized I can answer your questions myself :)

#1 I think that's possible, by using the same 
high-availability.zookeeper.path.root configuration parameter between the runs.
By default, on YARN we are using the YARN application ID as the root path, but 
if you are putting a custom one there, Flink will recover running jobs even if 
you are starting a new EMR cluster (assuming the files are in s3).

#2 in current Flink we can not expand a running Flink job! Yarn will see new 
machines being added to the cluster, and it can use them for future Flink 
deployments on YARN (as you said).
We are working on adding support for dynamically changing the Flink hardware 
allocations as part of FLIP-6.


Please keep asking and bugging us if we are not responding. Its just that most 
Flink developers are quite busy with the 1.3 release right now.

Regards,
Robert


On Wed, May 24, 2017 at 3:36 PM, Robert Metzger 
<rmetz...@apache.org<mailto:rmetz...@apache.org>> wrote:
Hi Ankit,

I'm sorry that nobody is responding to the message. I'll try to find somebody.

On Tue, May 23, 2017 at 10:27 PM, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:
Following up on this.

From: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Date: Tuesday, May 16, 2017 at 12:14 AM

To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: High Availability on Yarn

Bringing it back to list’s focus.

From: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Date: Thursday, May 11, 2017 at 1:19 PM
To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: High Availability on Yarn

Got the answer on #2, looks like that will work, still looking for suggestions 
on #1.

Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Date: Thursday, May 11, 2017 at 8:26 AM
To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: High Availability on Yarn

Following up further on this.


1)  We are using a long running EMR cluster to submit jobs right now and as 
you know EMR hasn’t made Yarn ResourceManager HA.

Is there any way we can use the information put in Zookeeper by Flink Job 
Manager to bring the jobs back up on a new EMR cluster if RM goes down?



We are not looking for completely automated option but maybe write a script 
which reads Zookeeper and re-starts all jobs on a fresh EMR cluster?

I am assuming if  Yarn ResouceManager goes down, there is no way to just bring 
it back up – you have to start a new EMR cluster?



2)  Regarding elasticity, I know for now a running flink cluster can’t make 
use of new hosts added to EMR but can I am guessing Yarn will still see the new 
hosts and new flink jobs can make use it, is that right?


Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Date: Monday, May 8, 2017 at 9:09 AM
To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: High Availability on Yarn

Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, "Jain, Ankit" 
<ankit.j...@here.com<mailto:ankit.j...@here.com>>
Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally c

Re: High Availability on Yarn

2017-05-23 Thread Jain, Ankit
Following up on this.

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Tuesday, May 16, 2017 at 12:14 AM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Bringing it back to list’s focus.

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Thursday, May 11, 2017 at 1:19 PM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Got the answer on #2, looks like that will work, still looking for suggestions 
on #1.

Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Thursday, May 11, 2017 at 8:26 AM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Following up further on this.


1)  We are using a long running EMR cluster to submit jobs right now and as 
you know EMR hasn’t made Yarn ResourceManager HA.

Is there any way we can use the information put in Zookeeper by Flink Job 
Manager to bring the jobs back up on a new EMR cluster if RM goes down?



We are not looking for completely automated option but maybe write a script 
which reads Zookeeper and re-starts all jobs on a fresh EMR cluster?

I am assuming if  Yarn ResouceManager goes down, there is no way to just bring 
it back up – you have to start a new EMR cluster?



2)  Regarding elasticity, I know for now a running flink cluster can’t make 
use of new hosts added to EMR but can I am guessing Yarn will still see the new 
hosts and new flink jobs can make use it, is that right?


Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Monday, May 8, 2017 at 9:09 AM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen <se...@apache.org>
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" 
<ankit.j...@here.com>
Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally create a second one when 
there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any 
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink 
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
<aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

On 5. May 2017, at 16:56, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for the update Aljoscha.

@Till Rohrmann<mailto:trohrm...@apache.org>,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody sta

Re: Stateful streaming question

2017-05-16 Thread Jain, Ankit
Hi Flavio,
While you wait on an update from Kostas, wanted to understand the use-case 
better and share my thoughts-


1)   Why is current batch mode expensive? Where are you persisting the data 
after updates? Way I see it by moving to Flink, you get to use RocksDB(a 
key-value store) that makes your lookups faster – probably right now you are 
using a non-indexed store like S3 maybe?

So, gain is coming from moving to a better persistence store suited to your 
use-case than from batch->streaming. Myabe consider just going with a different 
data store.

IMHO, stream should only be used if you really want to act on the new events in 
real-time. It is generally harder to get a streaming job correct than a batch 
one.



2)   If current setup is expensive due to serialization-deserialization 
then that should be fixed by moving to a faster format (maybe AVRO? - I don’t 
have a lot of expertise in that). I don’t see how that problem will go away 
with Flink – so still need to handle serialization.



3)   Even if you do decide to move to Flink – I think you can do this with 
one job, two jobs are not needed. At every incoming event, check the previous 
state and update/output to kafka or whatever data store you are using.


Thanks
Ankit

From: Flavio Pompermaier 
Date: Tuesday, May 16, 2017 at 9:31 AM
To: Kostas Kloudas 
Cc: user 
Subject: Re: Stateful streaming question

Hi Kostas,
thanks for your quick response.
I also thought about using Async IO, I just need to figure out how to correctly 
handle parallelism and number of async requests.
However that's probably the way to go..is it possible also to set a number of 
retry attempts/backoff when the async request fails (maybe due to a too busy 
server)?

For the second part I think it's ok to persist the state into RocksDB or HDFS, 
my question is indeed about that: is it safe to start reading (with another 
Flink job) from RocksDB or HDFS having an updatable state "pending" on it? 
Should I ensure that state updates are not possible until the other Flink job 
hasn't finish to read the persisted data?

And another question...I've tried to draft such a processand basically I have 
the following code:

DataStream groupedObj = tuples.keyBy(0)
.flatMap(new RichFlatMapFunction() {

  private transient ValueState state;

  @Override
  public void flatMap(Tuple4 t, Collector out) throws 
Exception {
MyGroupedObj current = state.value();
if (current == null) {
  current = new MyGroupedObj();
}

   current.addTuple(t);
...
state.update(current);
out.collect(current);
  }

  @Override
  public void open(Configuration config) {
ValueStateDescriptor descriptor =
  new ValueStateDescriptor<>( 
"test",TypeInformation.of(MyGroupedObj.class));
  state = getRuntimeContext().getState(descriptor);
  }
});
groupedObj.print();

but obviously this way I emit the updated object on every update while, 
actually, I just want to persist the ValueState somehow (and make it available 
to another job that runs one/moth for example). Is that possible?


On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas 
> wrote:
Hi Flavio,

From what I understand, for the first part you are correct. You can use Flink’s 
internal state to keep your enriched data.
In fact, if you are also querying an external system to enrich your data, it is 
worth looking at the AsyncIO feature:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html

Now for the second part, currently in Flink you cannot iterate over all 
registered keys for which you have state. A pointer
to look at the may be useful is the queryable state:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html

This is still an experimental feature, but let us know your opinion if you use 
it.

Finally, an alternative would be to keep state in Flink, and periodically flush 
it to an external storage system, which you can
query at will.

Thanks,
Kostas


On May 16, 2017, at 4:38 PM, Flavio Pompermaier 
> wrote:

Hi to all,
we're still playing with Flink streaming part in order to see whether it can 
improve our current batch pipeline.
At the moment, we have a job that translate incoming data (as Row) into Tuple4, 
groups them together by the first field and persist the result to disk (using a 
thrift object). When we need to add tuples to those grouped objects we need to 
read again the persisted data, flat it back to Tuple4, union with the new 
tuples, re-group by key and finally persist.

This is very expansive to do with batch computation 

Re: High Availability on Yarn

2017-05-16 Thread Jain, Ankit
Bringing it back to list’s focus.

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Thursday, May 11, 2017 at 1:19 PM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Got the answer on #2, looks like that will work, still looking for suggestions 
on #1.

Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Thursday, May 11, 2017 at 8:26 AM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Following up further on this.


1)  We are using a long running EMR cluster to submit jobs right now and as 
you know EMR hasn’t made Yarn ResourceManager HA.

Is there any way we can use the information put in Zookeeper by Flink Job 
Manager to bring the jobs back up on a new EMR cluster if RM goes down?



We are not looking for completely automated option but maybe write a script 
which reads Zookeeper and re-starts all jobs on a fresh EMR cluster?

I am assuming if  Yarn ResouceManager goes down, there is no way to just bring 
it back up – you have to start a new EMR cluster?



2)  Regarding elasticity, I know for now a running flink cluster can’t make 
use of new hosts added to EMR but can I am guessing Yarn will still see the new 
hosts and new flink jobs can make use it, is that right?


Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Monday, May 8, 2017 at 9:09 AM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen <se...@apache.org>
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" 
<ankit.j...@here.com>
Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally create a second one when 
there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any 
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink 
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
<aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

On 5. May 2017, at 16:56, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for the update Aljoscha.

@Till Rohrmann<mailto:trohrm...@apache.org>,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous

Re: static/dynamic lookups in flink streaming

2017-05-15 Thread Jain, Ankit
What if we copy the big data set to HDFS on start of cluster (eg EMR if using 
AWS) and then use that to build distributed operatot state in Flink instead of 
calling the external store?

How does flink contributors feel about that?

Thanks
Ankit

On 5/14/17, 8:17 PM, "yunfan123"  wrote:

The 1.2.0 is released. Can you give an example for the feature function
asynchronous operations?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/static-dynamic-lookups-in-flink-streaming-tp10726p13133.html
Sent from the Apache Flink User Mailing List archive. mailing list archive 
at Nabble.com.





Re: Storage options for RocksDBStateBackend

2017-05-15 Thread Jain, Ankit
Also, I hope state & checkpointing writes to S3 happens async w/o impacting the 
actual job execution graph?

If so, will there still be a performance impact from using S3?

Thanks
Ankit

From: Ayush Goyal 
Date: Thursday, May 11, 2017 at 11:21 PM
To: Stephan Ewen , Till Rohrmann 
Cc: user 
Subject: Re: Storage options for RocksDBStateBackend

Till and Stephan, thanks for your clarification.

@Till One more question, from what I have read about the checkpointing [1], the 
list operations don't seem likely to be performed frequently, so storing state 
backend on s3 shouldn't have any severe impact on flink performance. Is this 
assumption right?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html

-- Ayush

On Fri, May 12, 2017 at 1:05 AM Stephan Ewen 
> wrote:
Small addition to Till's comment:

In the case where file:// points to a mounted distributed file system (NFS, 
MapRFs, ...), then it actually works. The important thing is that the 
filesystem where the checkpoints go is replicated (fault tolerant) and 
accessible from all nodes.

On Thu, May 11, 2017 at 2:16 PM, Till Rohrmann 
> wrote:

Hi Ayush,

you’re right that RocksDB is the recommend state backend because of the 
above-mentioned reasons. In order to make the recovery properly work, you have 
to configure a shared directory for the checkpoint data via 
state.backend.fs.checkpointdir. You can basically configure any file system 
which is supported by Hadoop (no HDFS required). The reason is that we use 
Hadoop to bridge between different file systems. The only thing you have to 
make sure is that you have the respective file system implementation in your 
class path.

I think you can access Windows Azure Blob Storage via Hadoop [1] similarly to 
access S3, for example.

If you use S3 to store your checkpoint data, then you will benefit from all the 
advantages of S3 but also suffer from its drawbacks (e.g. that list operations 
are more costly). But these are not specific to Flink.

A URL like file:// usually indicates a local file. Thus, if your Flink cluster 
is not running on a single machine, then this won’t work.

[1] https://hadoop.apache.org/docs/stable/hadoop-azure/index.html

Cheers,
Till
​

On Thu, May 11, 2017 at 10:41 AM, Ayush Goyal 
> wrote:
Hello,

I had a few questions regarding checkpoint storage options using
RocksDBStateBackend. In the flink 1.2 documentation, it is the recommended state
backend due to it's ability to store large states and asynchronous snapshotting.
For high availabilty it seems HDFS is the recommended store for state backend
data. In AWS deployment section, it is also mentioned that s3 can be used for
storing state backend data.

We don't want to depend on a hadoop cluster for flink deployment, so I had
following questions:

1. Can we use any storage backend supported by flink for storing RocksDB
StateBackend data with file urls: there are quite a few supported as mentioned 
here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html
and here:
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md

2. Is there some work already done to support Windows Azure Blob Storage for
storing State backend data? There are some docs here:
https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md
can we utilize this for that?

3. If utilizing S3 for state backend, is there any performance impact?

4. For high availability can we use a NFS volume for state backend, with
"file://" urls? Will there be any performance impact?

PS: I posted this email earlier via nabble, but it's not showing up in apache 
archive. So sending again. Apologies if it results in multiple threads.

-- Ayush




Re: High Availability on Yarn

2017-05-11 Thread Jain, Ankit
Got the answer on #2, looks like that will work, still looking for suggestions 
on #1.

Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Thursday, May 11, 2017 at 8:26 AM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Following up further on this.


1)  We are using a long running EMR cluster to submit jobs right now and as 
you know EMR hasn’t made Yarn ResourceManager HA.

Is there any way we can use the information put in Zookeeper by Flink Job 
Manager to bring the jobs back up on a new EMR cluster if RM goes down?



We are not looking for completely automated option but maybe write a script 
which reads Zookeeper and re-starts all jobs on a fresh EMR cluster?

I am assuming if  Yarn ResouceManager goes down, there is no way to just bring 
it back up – you have to start a new EMR cluster?



2)  Regarding elasticity, I know for now a running flink cluster can’t make 
use of new hosts added to EMR but can I am guessing Yarn will still see the new 
hosts and new flink jobs can make use it, is that right?


Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Monday, May 8, 2017 at 9:09 AM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen <se...@apache.org>
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" 
<ankit.j...@here.com>
Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally create a second one when 
there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any 
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink 
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
<aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

On 5. May 2017, at 16:56, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for the update Aljoscha.

@Till Rohrmann<mailto:trohrm...@apache.org>,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous running containers.  
So, where does Zookeeper sit in this setup?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Wednesday, May 3, 2017 at 2:05 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@he

Re: High Availability on Yarn

2017-05-11 Thread Jain, Ankit
Following up further on this.


1)   We are using a long running EMR cluster to submit jobs right now and 
as you know EMR hasn’t made Yarn ResourceManager HA.

Is there any way we can use the information put in Zookeeper by Flink Job 
Manager to bring the jobs back up on a new EMR cluster if RM goes down?



We are not looking for completely automated option but maybe write a script 
which reads Zookeeper and re-starts all jobs on a fresh EMR cluster?

I am assuming if  Yarn ResouceManager goes down, there is no way to just bring 
it back up – you have to start a new EMR cluster?



2)   Regarding elasticity, I know for now a running flink cluster can’t 
make use of new hosts added to EMR but can I am guessing Yarn will still see 
the new hosts and new flink jobs can make use it, is that right?


Thanks
Ankit

From: "Jain, Ankit" <ankit.j...@here.com>
Date: Monday, May 8, 2017 at 9:09 AM
To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: High Availability on Yarn

Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen <se...@apache.org>
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" 
<ankit.j...@here.com>
Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally create a second one when 
there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any 
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink 
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
<aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

On 5. May 2017, at 16:56, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for the update Aljoscha.

@Till Rohrmann<mailto:trohrm...@apache.org>,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous running containers.  
So, where does Zookeeper sit in this setup?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Wednesday, May 3, 2017 at 2:05 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>>
Subject: Re: High Availability on Yarn

Hi,
As a first comment, the work mentioned in the FLIP-6 doc you lin

Re: High Availability on Yarn

2017-05-08 Thread Jain, Ankit
Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully 
have it started through a cloudformation script as part of EMR startup.

Is Zk also used to keep track of checkpoint metadata and the execution graph of 
the running job to recover from ApplicationMaster failure as Aljoscha was 
guessing below or only for leader election in case of accidently running 
multiple Application Masters ?

Thanks
Ankit

From: Stephan Ewen <se...@apache.org>
Date: Monday, May 8, 2017 at 9:00 AM
To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" 
<ankit.j...@here.com>
Subject: Re: High Availability on Yarn

@Ankit:

ZooKeeper is required in YARN setups still. Even if there is only one 
JobManager in the normal case, Yarn can accidentally create a second one when 
there is a network partition.
To prevent that this leads to inconsistencies, we use ZooKeeper.

Flink uses ZooKeeper very little, so you can just let Flink attach to any 
existing ZooKeeper, or user one ZooKeeper cluster for very many Flink 
clusters/jobs.

Stephan


On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek 
<aljos...@apache.org<mailto:aljos...@apache.org>> wrote:
Hi,
Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters.

Best,
Aljoscha

On 5. May 2017, at 16:56, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for the update Aljoscha.

@Till Rohrmann<mailto:trohrm...@apache.org>,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous running containers.  
So, where does Zookeeper sit in this setup?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Wednesday, May 3, 2017 at 2:05 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>>
Subject: Re: High Availability on Yarn

Hi,
As a first comment, the work mentioned in the FLIP-6 doc you linked is still 
work-in-progress. You cannot use these abstractions yet without going into the 
code and setting up a cluster “by hand”.

The documentation for one-step deployment of a Job to YARN is available here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.2%2Fsetup%2Fyarn_setup.html%23run-a-single-flink-job-on-yarn=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=fzBiQtv7MR2%2Fehg6GepwPa1uWxpqEgPJakto2B8k0Zk%3D=0>

Regarding your third question, ZooKeeper is mostly used for discovery and 
leader election. That is, JobManagers use it to decide who is the main JM and 
who are standby JMs. TaskManagers use it to discover the leading JobManager 
that they should connect to.

I’m also cc’ing Till, who should know this stuff better and can maybe explain 
it in a bit more detail.

Best,
Aljoscha
On 1. May 2017, at 18:59, Jain, Ankit 

Re: High Availability on Yarn

2017-05-05 Thread Jain, Ankit
Thanks for the update Aljoscha.

@Till Rohrmann<mailto:trohrm...@apache.org>,
Can you please chim in?

Also, we currently have a long running EMR cluster where we create one flink 
cluster per job – can we just choose to install Zookeeper when creating the EMR 
cluster and use one Zookeeper instance for ALL of flink jobs?
Or
Recommendation is to have a dedicated Zookeeper instance per flink job?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org>
Date: Thursday, May 4, 2017 at 1:19 AM
To: "Jain, Ankit" <ankit.j...@here.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>, Till Rohrmann 
<trohrm...@apache.org>
Subject: Re: High Availability on Yarn

Hi,
Yes, for YARN there is only one running JobManager. As far as I Know, In this 
case ZooKeeper is only used to keep track of checkpoint metadata and the 
execution graph of the running job. Such that a restoring JobManager can pick 
up the data again.

I’m not 100 % sure on this, though, so maybe Till can shed some light on this.

Best,
Aljoscha
On 3. May 2017, at 16:58, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous running containers.  
So, where does Zookeeper sit in this setup?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Wednesday, May 3, 2017 at 2:05 AM
To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann 
<trohrm...@apache.org<mailto:trohrm...@apache.org>>
Subject: Re: High Availability on Yarn

Hi,
As a first comment, the work mentioned in the FLIP-6 doc you linked is still 
work-in-progress. You cannot use these abstractions yet without going into the 
code and setting up a cluster “by hand”.

The documentation for one-step deployment of a Job to YARN is available here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.2%2Fsetup%2Fyarn_setup.html%23run-a-single-flink-job-on-yarn=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=fzBiQtv7MR2%2Fehg6GepwPa1uWxpqEgPJakto2B8k0Zk%3D=0>

Regarding your third question, ZooKeeper is mostly used for discovery and 
leader election. That is, JobManagers use it to decide who is the main JM and 
who are standby JMs. TaskManagers use it to discover the leading JobManager 
that they should connect to.

I’m also cc’ing Till, who should know this stuff better and can maybe explain 
it in a bit more detail.

Best,
Aljoscha
On 1. May 2017, at 18:59, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Hi fellow users,
We are trying to straighten out high availability story for flink.

Our setup includes a long running EMR cluster, job submission is a two-step 
process – 1) Flink cluster is first created using flink yarn client on the EMR 
cluster already running 2) Flink job is submitted.

I also saw references that with 1.2, these two steps have been combined into 1 
– is that change in FlinkYarnSessionCli.java? Can somebody point to 
documentation please?

W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly 
introduced) failure for now, I want to understand first how task manager & job 
manager failures are handled.

My questions-
1)   
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D65147077=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=29Se7mWQZ09ukF3rkQNmSRPXY4RkA8RCNO4ec4Glj8I%3D=0>
 suggests a new RM has been added and now there is one JobManager for each job. 
Since Yarn RM will now talk to Flink RM( instead of JobManager previously), 
will Yarn automatically restart failing Flink RM?
2)   Is there any documentation on behavior of new Flink RM that will come 
up? How will previously running JobManagers & TaskManagers find out about new 
RM?
3)   
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanag

Re: High Availability on Yarn

2017-05-03 Thread Jain, Ankit
Thanks for your reply Aljoscha.

After building better understanding of Yarn and spending copious amount of time 
on Flink codebase, I think I now get how Flink & Yarn interact – I plan to 
document this soon in case it could help somebody starting afresh with 
Flink-Yarn.

Regarding Zookeper, in YARN mode there is only one JobManager running, do we 
still need leader election?

If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM 
and while restarting, Flink AM will bring back previous running containers.  
So, where does Zookeeper sit in this setup?

Thanks
Ankit

From: Aljoscha Krettek <aljos...@apache.org>
Date: Wednesday, May 3, 2017 at 2:05 AM
To: "Jain, Ankit" <ankit.j...@here.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>, Till Rohrmann 
<trohrm...@apache.org>
Subject: Re: High Availability on Yarn

Hi,
As a first comment, the work mentioned in the FLIP-6 doc you linked is still 
work-in-progress. You cannot use these abstractions yet without going into the 
code and setting up a cluster “by hand”.

The documentation for one-step deployment of a Job to YARN is available here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.2%2Fsetup%2Fyarn_setup.html%23run-a-single-flink-job-on-yarn=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=fzBiQtv7MR2%2Fehg6GepwPa1uWxpqEgPJakto2B8k0Zk%3D=0>

Regarding your third question, ZooKeeper is mostly used for discovery and 
leader election. That is, JobManagers use it to decide who is the main JM and 
who are standby JMs. TaskManagers use it to discover the leading JobManager 
that they should connect to.

I’m also cc’ing Till, who should know this stuff better and can maybe explain 
it in a bit more detail.

Best,
Aljoscha
On 1. May 2017, at 18:59, Jain, Ankit 
<ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote:

Hi fellow users,
We are trying to straighten out high availability story for flink.

Our setup includes a long running EMR cluster, job submission is a two-step 
process – 1) Flink cluster is first created using flink yarn client on the EMR 
cluster already running 2) Flink job is submitted.

I also saw references that with 1.2, these two steps have been combined into 1 
– is that change in FlinkYarnSessionCli.java? Can somebody point to 
documentation please?

W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly 
introduced) failure for now, I want to understand first how task manager & job 
manager failures are handled.

My questions-
1)   
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D65147077=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=29Se7mWQZ09ukF3rkQNmSRPXY4RkA8RCNO4ec4Glj8I%3D=0>
 suggests a new RM has been added and now there is one JobManager for each job. 
Since Yarn RM will now talk to Flink RM( instead of JobManager previously), 
will Yarn automatically restart failing Flink RM?
2)   Is there any documentation on behavior of new Flink RM that will come 
up? How will previously running JobManagers & TaskManagers find out about new 
RM?
3)   
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#configuration<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.3%2Fsetup%2Fjobmanager_high_availability.html%23configuration=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=nYTYWaaWA4T1D7EwvL%2B7mwhrVcqn6xTzCv8SS6x%2FqLM%3D=0>
 requires configuring Zookeeper even for Yarn – Is this needed for handling 
Task Manager failures or JM or both? Will Yarn not take care of JM failures?

It may sound like I am little confused between role of Yarn and Flink 
components– who has the most burden of HA? Documentation in current state is 
lacking clarity – I know it is still evolving.

Please let me know if somebody can help clear the confusion.

Thanks
Ankit





High Availability on Yarn

2017-05-01 Thread Jain, Ankit
Hi fellow users,
We are trying to straighten out high availability story for flink.

Our setup includes a long running EMR cluster, job submission is a two-step 
process – 1) Flink cluster is first created using flink yarn client on the EMR 
cluster already running 2) Flink job is submitted.

I also saw references that with 1.2, these two steps have been combined into 1 
– is that change in FlinkYarnSessionCli.java? Can somebody point to 
documentation please?

W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly 
introduced) failure for now, I want to understand first how task manager & job 
manager failures are handled.

My questions-

1)   
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 
suggests a new RM has been added and now there is one JobManager for each job. 
Since Yarn RM will now talk to Flink RM( instead of JobManager previously), 
will Yarn automatically restart failing Flink RM?

2)   Is there any documentation on behavior of new Flink RM that will come 
up? How will previously running JobManagers & TaskManagers find out about new 
RM?

3)   
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#configuration
 requires configuring Zookeeper even for Yarn – Is this needed for handling 
Task Manager failures or JM or both? Will Yarn not take care of JM failures?

It may sound like I am little confused between role of Yarn and Flink 
components– who has the most burden of HA? Documentation in current state is 
lacking clarity – I know it is still evolving.

Please let me know if somebody can help clear the confusion.

Thanks
Ankit




Re: Cross operation on two huge datasets

2017-03-02 Thread Jain, Ankit
 information to the point. 
Then do a groupBy to merge the points that were inside of multiple shapes.

This works very well in a local runtime but fails on yarn because it seems that 
the taskmanager reloads the jar file between two jobs, making me lose my static 
RTree (I guess that newly loaded class overwrites the older one).

I have two questions :

-  Is there a way to avoid that jar reload // can I store my RTree 
somewhere in jdk or flink, locally to the taskmanager, in a way that it 
wouldn’t be affected by the jar reload (since it would not be stored in any 
class from MY jar)?

o   I could also try to do it in a single job, but I don’t know how to ensure 
that some operations are done (parsing of shape) BEFORE starting others 
handling the points.

-  Is there a way to do that in a clean way using flink operators ? I’d 
need to be able to use the result of the iteration of a dataset inside of my 
map.

Something like :

datasetA.flatmap(new MyMapOperator(datasetB))…

And In my implementation I would be able to iterate the whole datasetB BEFORE 
doing any operation in datasetA. That way I could parse all my shapes in an 
RTree before handling my points, without relying on static

Or any other way that would allow me to do something similar.

Thanks in advance for your insight.

Gwen’

From: Jain, Ankit [mailto:ankit.j...@here.com<mailto:ankit.j...@here.com>]
Sent: jeudi 23 février 2017 19:21
To: user@flink.apache.org<mailto:user@flink.apache.org>
Cc: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>

Subject: Re: Cross operation on two huge datasets

Hi Gwen,
I would recommend looking into a data structure called RTree that is designed 
specifically for this use case, i.e matching point to a region.

Thanks
Ankit

From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>
Date: Wednesday, February 22, 2017 at 2:41 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Cross operation on two huge datasets

Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each 
input it fills a memory buffer and to perform the cross. Then the buffer of the 
spilled input is refilled with spilled records and records are again crossed. 
This is done until one iteration over the spill records is done. Then the other 
buffer of the streamed input is filled with the next records.
You should be aware that cross is a super expensive operation, especially if 
you evaluate a complex condition for each pair of records. So cross can be 
easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial 
partitioning and do a key-based join on the partitions. Within each partition 
you'd perform a cross.
Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers 
<gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

I need (or at least I think I do) to do a cross operation between two huge 
datasets. One dataset is a list of points. The other one is a list of shapes 
(areas).

I want to know, for each point, the areas (they might overlap so a point can be 
in multiple areas) it belongs to so I thought I’d “cross” my points and areas 
since I need to test each point against each area.

I tried it and my job stucks seems to work for some seconds then, at some 
point, it stucks.

I’m wondering if Flink, for cross operations, tries to load one of the two 
datasets into RAM or if it’s able to split the job in multiple iterations (even 
if it means reading one of the two datasets multiple times).

Or maybe I’m going at it the wrong way, or missing some parameters, feel free 
to correct me ☺

I’m using flink 1.0.1.

Thanks in advance

Gwen’


​


Re: Cross operation on two huge datasets

2017-02-23 Thread Jain, Ankit
Hi Gwen,
I would recommend looking into a data structure called RTree that is designed 
specifically for this use case, i.e matching point to a region.

Thanks
Ankit

From: Fabian Hueske 
Date: Wednesday, February 22, 2017 at 2:41 PM
To: 
Subject: Re: Cross operation on two huge datasets

Hi Gwen,
Flink usually performs a block nested loop join to cross two data sets.
This algorithm spills one input to disk and streams the other input. For each 
input it fills a memory buffer and to perform the cross. Then the buffer of the 
spilled input is refilled with spilled records and records are again crossed. 
This is done until one iteration over the spill records is done. Then the other 
buffer of the streamed input is filled with the next records.
You should be aware that cross is a super expensive operation, especially if 
you evaluate a complex condition for each pair of records. So cross can be 
easily too expensive to compute.
For such use cases it is usually better to apply a coarse-grained spatial 
partitioning and do a key-based join on the partitions. Within each partition 
you'd perform a cross.
Best, Fabian


2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers 
>:
Hi,

I need (or at least I think I do) to do a cross operation between two huge 
datasets. One dataset is a list of points. The other one is a list of shapes 
(areas).

I want to know, for each point, the areas (they might overlap so a point can be 
in multiple areas) it belongs to so I thought I’d “cross” my points and areas 
since I need to test each point against each area.

I tried it and my job stucks seems to work for some seconds then, at some 
point, it stucks.

I’m wondering if Flink, for cross operations, tries to load one of the two 
datasets into RAM or if it’s able to split the job in multiple iterations (even 
if it means reading one of the two datasets multiple times).

Or maybe I’m going at it the wrong way, or missing some parameters, feel free 
to correct me ☺

I’m using flink 1.0.1.

Thanks in advance

Gwen’