Re: Implement Joins with Lookup Data
How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc. Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time. Thanks Ankit From: Jörn Franke Date: Monday, July 23, 2018 at 10:10 PM To: Harshvardhan Agrawal Cc: Subject: Re: Implement Joins with Lookup Data For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale. Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. On 24. Jul 2018, at 05:25, Harshvardhan Agrawal mailto:harshvardhan.ag...@gmail.com>> wrote: Hi, We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it: 1) First Approach: a) Get positions from Kafka and key by product key. b) Perform lookup from the database for each key and then obtain Tuple2 2) Second Approach: a) Get positions from Kafka and key by product key. b) Window the keyed stream into say 15 seconds each. c) For each window get the unique product keys and perform a single lookup. d) Somehow join Positions and Products In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive. In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join? -- Regards, Harshvardhan Agrawal
Re: Appending Windowed Aggregates to Events
You could load the historical data as flink state and then look up the state with the key derived from input record. That should serve like a join in relational world. You may also want to think about keeping the writes and querying isolated. Especially if your windows are going to be long (eg cash transactions for last 6 months in your example) and you need your data to be persistent long term, having a durable store outside of Flink will really help. Flink state feature is really nice but I wouldn’t view it as a long term durable storage like a no-sql store or a relational db like oracle. Thanks Ankit From: Tim StearnDate: Friday, June 23, 2017 at 3:59 PM To: "user@flink.apache.org" Subject: Appending Windowed Aggregates to Events Hello All, I’m *very* new to Flink. I read through the documentation and played with some sample code, but I’m struggling to get started with my requirements. We want to use Flink to maintain windowed aggregates as part of a transaction monitoring application. These would use sliding window definitions. An example would be: “Total amount for CASH transactions in the last 5 days”. Here’s what I need my Flink application to do: 1. Prepare for transaction processing by reading historical aggregates and building windows 2. For each new transaction: a. Update the windowed aggregate with the new transaction data b. Find the window that matches the incoming time stamp and add the aggregate value to the transaction c. Send enhanced transaction (original fields + aggregates from matching window) to downstream processor via RabbitMQ or Kafka sink For every transaction coming in, I want one (and only one) output that contains the original transaction fields plus the aggregates. I see how to do the code to create the window assigner and the code that incrementally maintains the aggregates. I’m not sure how I could join this back to the original transaction record, appending the aggregate values from the window that matches the transaction date stamp. This seems like a join of some kind to me, but I don’t know how to implement in in Flink. I’m hoping someone could reply with some simple code (or even pseudo code) to get me started on the “join” part of the above data flow. Please let me know if I need to clarify. Thanks, Tim Stearn
Re: High Availability on Yarn
Thanks for the reply Robert – I will try out #1 & keep you posted. From: Robert Metzger <rmetz...@apache.org> Date: Wednesday, May 24, 2017 at 7:44 AM To: "Jain, Ankit" <ankit.j...@here.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Hi Ankit, I realized I can answer your questions myself :) #1 I think that's possible, by using the same high-availability.zookeeper.path.root configuration parameter between the runs. By default, on YARN we are using the YARN application ID as the root path, but if you are putting a custom one there, Flink will recover running jobs even if you are starting a new EMR cluster (assuming the files are in s3). #2 in current Flink we can not expand a running Flink job! Yarn will see new machines being added to the cluster, and it can use them for future Flink deployments on YARN (as you said). We are working on adding support for dynamically changing the Flink hardware allocations as part of FLIP-6. Please keep asking and bugging us if we are not responding. Its just that most Flink developers are quite busy with the 1.3 release right now. Regards, Robert On Wed, May 24, 2017 at 3:36 PM, Robert Metzger <rmetz...@apache.org<mailto:rmetz...@apache.org>> wrote: Hi Ankit, I'm sorry that nobody is responding to the message. I'll try to find somebody. On Tue, May 23, 2017 at 10:27 PM, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Following up on this. From: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Date: Tuesday, May 16, 2017 at 12:14 AM To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: High Availability on Yarn Bringing it back to list’s focus. From: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Date: Thursday, May 11, 2017 at 1:19 PM To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: High Availability on Yarn Got the answer on #2, looks like that will work, still looking for suggestions on #1. Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Date: Thursday, May 11, 2017 at 8:26 AM To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: High Availability on Yarn Following up further on this. 1) We are using a long running EMR cluster to submit jobs right now and as you know EMR hasn’t made Yarn ResourceManager HA. Is there any way we can use the information put in Zookeeper by Flink Job Manager to bring the jobs back up on a new EMR cluster if RM goes down? We are not looking for completely automated option but maybe write a script which reads Zookeeper and re-starts all jobs on a fresh EMR cluster? I am assuming if Yarn ResouceManager goes down, there is no way to just bring it back up – you have to start a new EMR cluster? 2) Regarding elasticity, I know for now a running flink cluster can’t make use of new hosts added to EMR but can I am guessing Yarn will still see the new hosts and new flink jobs can make use it, is that right? Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Date: Monday, May 8, 2017 at 9:09 AM To: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>, "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: High Availability on Yarn Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully have it started through a cloudformation script as part of EMR startup. Is Zk also used to keep track of checkpoint metadata and the execution graph of the running job to recover from ApplicationMaster failure as Aljoscha was guessing below or only for leader election in case of accidently running multiple Application Masters ? Thanks Ankit From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>> Date: Monday, May 8, 2017 at 9:00 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Subject: Re: High Availability on Yarn @Ankit: ZooKeeper is required in YARN setups still. Even if there is only one JobManager in the normal case, Yarn can accidentally c
Re: High Availability on Yarn
Following up on this. From: "Jain, Ankit" <ankit.j...@here.com> Date: Tuesday, May 16, 2017 at 12:14 AM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Bringing it back to list’s focus. From: "Jain, Ankit" <ankit.j...@here.com> Date: Thursday, May 11, 2017 at 1:19 PM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Got the answer on #2, looks like that will work, still looking for suggestions on #1. Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com> Date: Thursday, May 11, 2017 at 8:26 AM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Following up further on this. 1) We are using a long running EMR cluster to submit jobs right now and as you know EMR hasn’t made Yarn ResourceManager HA. Is there any way we can use the information put in Zookeeper by Flink Job Manager to bring the jobs back up on a new EMR cluster if RM goes down? We are not looking for completely automated option but maybe write a script which reads Zookeeper and re-starts all jobs on a fresh EMR cluster? I am assuming if Yarn ResouceManager goes down, there is no way to just bring it back up – you have to start a new EMR cluster? 2) Regarding elasticity, I know for now a running flink cluster can’t make use of new hosts added to EMR but can I am guessing Yarn will still see the new hosts and new flink jobs can make use it, is that right? Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com> Date: Monday, May 8, 2017 at 9:09 AM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully have it started through a cloudformation script as part of EMR startup. Is Zk also used to keep track of checkpoint metadata and the execution graph of the running job to recover from ApplicationMaster failure as Aljoscha was guessing below or only for leader election in case of accidently running multiple Application Masters ? Thanks Ankit From: Stephan Ewen <se...@apache.org> Date: Monday, May 8, 2017 at 9:00 AM To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" <ankit.j...@here.com> Subject: Re: High Availability on Yarn @Ankit: ZooKeeper is required in YARN setups still. Even if there is only one JobManager in the normal case, Yarn can accidentally create a second one when there is a network partition. To prevent that this leads to inconsistencies, we use ZooKeeper. Flink uses ZooKeeper very little, so you can just let Flink attach to any existing ZooKeeper, or user one ZooKeeper cluster for very many Flink clusters/jobs. Stephan On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: Hi, Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters. Best, Aljoscha On 5. May 2017, at 16:56, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for the update Aljoscha. @Till Rohrmann<mailto:trohrm...@apache.org>, Can you please chim in? Also, we currently have a long running EMR cluster where we create one flink cluster per job – can we just choose to install Zookeeper when creating the EMR cluster and use one Zookeeper instance for ALL of flink jobs? Or Recommendation is to have a dedicated Zookeeper instance per flink job? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Thursday, May 4, 2017 at 1:19 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Subject: Re: High Availability on Yarn Hi, Yes, for YARN there is only one running JobManager. As far as I Know, In this case ZooKeeper is only used to keep track of checkpoint metadata and the execution graph of the running job. Such that a restoring JobManager can pick up the data again. I’m not 100 % sure on this, though, so maybe Till can shed some light on this. Best, Aljoscha On 3. May 2017, at 16:58, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for your reply Aljoscha. After building better understanding of Yarn and spending copious amount of time on Flink codebase, I think I now get how Flink & Yarn interact – I plan to document this soon in case it could help somebody sta
Re: Stateful streaming question
Hi Flavio, While you wait on an update from Kostas, wanted to understand the use-case better and share my thoughts- 1) Why is current batch mode expensive? Where are you persisting the data after updates? Way I see it by moving to Flink, you get to use RocksDB(a key-value store) that makes your lookups faster – probably right now you are using a non-indexed store like S3 maybe? So, gain is coming from moving to a better persistence store suited to your use-case than from batch->streaming. Myabe consider just going with a different data store. IMHO, stream should only be used if you really want to act on the new events in real-time. It is generally harder to get a streaming job correct than a batch one. 2) If current setup is expensive due to serialization-deserialization then that should be fixed by moving to a faster format (maybe AVRO? - I don’t have a lot of expertise in that). I don’t see how that problem will go away with Flink – so still need to handle serialization. 3) Even if you do decide to move to Flink – I think you can do this with one job, two jobs are not needed. At every incoming event, check the previous state and update/output to kafka or whatever data store you are using. Thanks Ankit From: Flavio PompermaierDate: Tuesday, May 16, 2017 at 9:31 AM To: Kostas Kloudas Cc: user Subject: Re: Stateful streaming question Hi Kostas, thanks for your quick response. I also thought about using Async IO, I just need to figure out how to correctly handle parallelism and number of async requests. However that's probably the way to go..is it possible also to set a number of retry attempts/backoff when the async request fails (maybe due to a too busy server)? For the second part I think it's ok to persist the state into RocksDB or HDFS, my question is indeed about that: is it safe to start reading (with another Flink job) from RocksDB or HDFS having an updatable state "pending" on it? Should I ensure that state updates are not possible until the other Flink job hasn't finish to read the persisted data? And another question...I've tried to draft such a processand basically I have the following code: DataStream groupedObj = tuples.keyBy(0) .flatMap(new RichFlatMapFunction () { private transient ValueState state; @Override public void flatMap(Tuple4 t, Collector out) throws Exception { MyGroupedObj current = state.value(); if (current == null) { current = new MyGroupedObj(); } current.addTuple(t); ... state.update(current); out.collect(current); } @Override public void open(Configuration config) { ValueStateDescriptor descriptor = new ValueStateDescriptor<>( "test",TypeInformation.of(MyGroupedObj.class)); state = getRuntimeContext().getState(descriptor); } }); groupedObj.print(); but obviously this way I emit the updated object on every update while, actually, I just want to persist the ValueState somehow (and make it available to another job that runs one/moth for example). Is that possible? On Tue, May 16, 2017 at 5:57 PM, Kostas Kloudas > wrote: Hi Flavio, From what I understand, for the first part you are correct. You can use Flink’s internal state to keep your enriched data. In fact, if you are also querying an external system to enrich your data, it is worth looking at the AsyncIO feature: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html Now for the second part, currently in Flink you cannot iterate over all registered keys for which you have state. A pointer to look at the may be useful is the queryable state: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html This is still an experimental feature, but let us know your opinion if you use it. Finally, an alternative would be to keep state in Flink, and periodically flush it to an external storage system, which you can query at will. Thanks, Kostas On May 16, 2017, at 4:38 PM, Flavio Pompermaier > wrote: Hi to all, we're still playing with Flink streaming part in order to see whether it can improve our current batch pipeline. At the moment, we have a job that translate incoming data (as Row) into Tuple4, groups them together by the first field and persist the result to disk (using a thrift object). When we need to add tuples to those grouped objects we need to read again the persisted data, flat it back to Tuple4, union with the new tuples, re-group by key and finally persist. This is very expansive to do with batch computation
Re: High Availability on Yarn
Bringing it back to list’s focus. From: "Jain, Ankit" <ankit.j...@here.com> Date: Thursday, May 11, 2017 at 1:19 PM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Got the answer on #2, looks like that will work, still looking for suggestions on #1. Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com> Date: Thursday, May 11, 2017 at 8:26 AM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Following up further on this. 1) We are using a long running EMR cluster to submit jobs right now and as you know EMR hasn’t made Yarn ResourceManager HA. Is there any way we can use the information put in Zookeeper by Flink Job Manager to bring the jobs back up on a new EMR cluster if RM goes down? We are not looking for completely automated option but maybe write a script which reads Zookeeper and re-starts all jobs on a fresh EMR cluster? I am assuming if Yarn ResouceManager goes down, there is no way to just bring it back up – you have to start a new EMR cluster? 2) Regarding elasticity, I know for now a running flink cluster can’t make use of new hosts added to EMR but can I am guessing Yarn will still see the new hosts and new flink jobs can make use it, is that right? Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com> Date: Monday, May 8, 2017 at 9:09 AM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully have it started through a cloudformation script as part of EMR startup. Is Zk also used to keep track of checkpoint metadata and the execution graph of the running job to recover from ApplicationMaster failure as Aljoscha was guessing below or only for leader election in case of accidently running multiple Application Masters ? Thanks Ankit From: Stephan Ewen <se...@apache.org> Date: Monday, May 8, 2017 at 9:00 AM To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" <ankit.j...@here.com> Subject: Re: High Availability on Yarn @Ankit: ZooKeeper is required in YARN setups still. Even if there is only one JobManager in the normal case, Yarn can accidentally create a second one when there is a network partition. To prevent that this leads to inconsistencies, we use ZooKeeper. Flink uses ZooKeeper very little, so you can just let Flink attach to any existing ZooKeeper, or user one ZooKeeper cluster for very many Flink clusters/jobs. Stephan On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: Hi, Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters. Best, Aljoscha On 5. May 2017, at 16:56, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for the update Aljoscha. @Till Rohrmann<mailto:trohrm...@apache.org>, Can you please chim in? Also, we currently have a long running EMR cluster where we create one flink cluster per job – can we just choose to install Zookeeper when creating the EMR cluster and use one Zookeeper instance for ALL of flink jobs? Or Recommendation is to have a dedicated Zookeeper instance per flink job? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Thursday, May 4, 2017 at 1:19 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Subject: Re: High Availability on Yarn Hi, Yes, for YARN there is only one running JobManager. As far as I Know, In this case ZooKeeper is only used to keep track of checkpoint metadata and the execution graph of the running job. Such that a restoring JobManager can pick up the data again. I’m not 100 % sure on this, though, so maybe Till can shed some light on this. Best, Aljoscha On 3. May 2017, at 16:58, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for your reply Aljoscha. After building better understanding of Yarn and spending copious amount of time on Flink codebase, I think I now get how Flink & Yarn interact – I plan to document this soon in case it could help somebody starting afresh with Flink-Yarn. Regarding Zookeper, in YARN mode there is only one JobManager running, do we still need leader election? If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM and while restarting, Flink AM will bring back previous
Re: static/dynamic lookups in flink streaming
What if we copy the big data set to HDFS on start of cluster (eg EMR if using AWS) and then use that to build distributed operatot state in Flink instead of calling the external store? How does flink contributors feel about that? Thanks Ankit On 5/14/17, 8:17 PM, "yunfan123"wrote: The 1.2.0 is released. Can you give an example for the feature function asynchronous operations? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/static-dynamic-lookups-in-flink-streaming-tp10726p13133.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Storage options for RocksDBStateBackend
Also, I hope state & checkpointing writes to S3 happens async w/o impacting the actual job execution graph? If so, will there still be a performance impact from using S3? Thanks Ankit From: Ayush GoyalDate: Thursday, May 11, 2017 at 11:21 PM To: Stephan Ewen , Till Rohrmann Cc: user Subject: Re: Storage options for RocksDBStateBackend Till and Stephan, thanks for your clarification. @Till One more question, from what I have read about the checkpointing [1], the list operations don't seem likely to be performed frequently, so storing state backend on s3 shouldn't have any severe impact on flink performance. Is this assumption right? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/internals/stream_checkpointing.html -- Ayush On Fri, May 12, 2017 at 1:05 AM Stephan Ewen > wrote: Small addition to Till's comment: In the case where file:// points to a mounted distributed file system (NFS, MapRFs, ...), then it actually works. The important thing is that the filesystem where the checkpoints go is replicated (fault tolerant) and accessible from all nodes. On Thu, May 11, 2017 at 2:16 PM, Till Rohrmann > wrote: Hi Ayush, you’re right that RocksDB is the recommend state backend because of the above-mentioned reasons. In order to make the recovery properly work, you have to configure a shared directory for the checkpoint data via state.backend.fs.checkpointdir. You can basically configure any file system which is supported by Hadoop (no HDFS required). The reason is that we use Hadoop to bridge between different file systems. The only thing you have to make sure is that you have the respective file system implementation in your class path. I think you can access Windows Azure Blob Storage via Hadoop [1] similarly to access S3, for example. If you use S3 to store your checkpoint data, then you will benefit from all the advantages of S3 but also suffer from its drawbacks (e.g. that list operations are more costly). But these are not specific to Flink. A URL like file:// usually indicates a local file. Thus, if your Flink cluster is not running on a single machine, then this won’t work. [1] https://hadoop.apache.org/docs/stable/hadoop-azure/index.html Cheers, Till On Thu, May 11, 2017 at 10:41 AM, Ayush Goyal > wrote: Hello, I had a few questions regarding checkpoint storage options using RocksDBStateBackend. In the flink 1.2 documentation, it is the recommended state backend due to it's ability to store large states and asynchronous snapshotting. For high availabilty it seems HDFS is the recommended store for state backend data. In AWS deployment section, it is also mentioned that s3 can be used for storing state backend data. We don't want to depend on a hadoop cluster for flink deployment, so I had following questions: 1. Can we use any storage backend supported by flink for storing RocksDB StateBackend data with file urls: there are quite a few supported as mentioned here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html and here: https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md 2. Is there some work already done to support Windows Azure Blob Storage for storing State backend data? There are some docs here: https://github.com/apache/flink/blob/master/docs/dev/batch/connectors.md can we utilize this for that? 3. If utilizing S3 for state backend, is there any performance impact? 4. For high availability can we use a NFS volume for state backend, with "file://" urls? Will there be any performance impact? PS: I posted this email earlier via nabble, but it's not showing up in apache archive. So sending again. Apologies if it results in multiple threads. -- Ayush
Re: High Availability on Yarn
Got the answer on #2, looks like that will work, still looking for suggestions on #1. Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com> Date: Thursday, May 11, 2017 at 8:26 AM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Following up further on this. 1) We are using a long running EMR cluster to submit jobs right now and as you know EMR hasn’t made Yarn ResourceManager HA. Is there any way we can use the information put in Zookeeper by Flink Job Manager to bring the jobs back up on a new EMR cluster if RM goes down? We are not looking for completely automated option but maybe write a script which reads Zookeeper and re-starts all jobs on a fresh EMR cluster? I am assuming if Yarn ResouceManager goes down, there is no way to just bring it back up – you have to start a new EMR cluster? 2) Regarding elasticity, I know for now a running flink cluster can’t make use of new hosts added to EMR but can I am guessing Yarn will still see the new hosts and new flink jobs can make use it, is that right? Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com> Date: Monday, May 8, 2017 at 9:09 AM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully have it started through a cloudformation script as part of EMR startup. Is Zk also used to keep track of checkpoint metadata and the execution graph of the running job to recover from ApplicationMaster failure as Aljoscha was guessing below or only for leader election in case of accidently running multiple Application Masters ? Thanks Ankit From: Stephan Ewen <se...@apache.org> Date: Monday, May 8, 2017 at 9:00 AM To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" <ankit.j...@here.com> Subject: Re: High Availability on Yarn @Ankit: ZooKeeper is required in YARN setups still. Even if there is only one JobManager in the normal case, Yarn can accidentally create a second one when there is a network partition. To prevent that this leads to inconsistencies, we use ZooKeeper. Flink uses ZooKeeper very little, so you can just let Flink attach to any existing ZooKeeper, or user one ZooKeeper cluster for very many Flink clusters/jobs. Stephan On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: Hi, Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters. Best, Aljoscha On 5. May 2017, at 16:56, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for the update Aljoscha. @Till Rohrmann<mailto:trohrm...@apache.org>, Can you please chim in? Also, we currently have a long running EMR cluster where we create one flink cluster per job – can we just choose to install Zookeeper when creating the EMR cluster and use one Zookeeper instance for ALL of flink jobs? Or Recommendation is to have a dedicated Zookeeper instance per flink job? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Thursday, May 4, 2017 at 1:19 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Subject: Re: High Availability on Yarn Hi, Yes, for YARN there is only one running JobManager. As far as I Know, In this case ZooKeeper is only used to keep track of checkpoint metadata and the execution graph of the running job. Such that a restoring JobManager can pick up the data again. I’m not 100 % sure on this, though, so maybe Till can shed some light on this. Best, Aljoscha On 3. May 2017, at 16:58, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for your reply Aljoscha. After building better understanding of Yarn and spending copious amount of time on Flink codebase, I think I now get how Flink & Yarn interact – I plan to document this soon in case it could help somebody starting afresh with Flink-Yarn. Regarding Zookeper, in YARN mode there is only one JobManager running, do we still need leader election? If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM and while restarting, Flink AM will bring back previous running containers. So, where does Zookeeper sit in this setup? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Wednesday, May 3, 2017 at 2:05 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@he
Re: High Availability on Yarn
Following up further on this. 1) We are using a long running EMR cluster to submit jobs right now and as you know EMR hasn’t made Yarn ResourceManager HA. Is there any way we can use the information put in Zookeeper by Flink Job Manager to bring the jobs back up on a new EMR cluster if RM goes down? We are not looking for completely automated option but maybe write a script which reads Zookeeper and re-starts all jobs on a fresh EMR cluster? I am assuming if Yarn ResouceManager goes down, there is no way to just bring it back up – you have to start a new EMR cluster? 2) Regarding elasticity, I know for now a running flink cluster can’t make use of new hosts added to EMR but can I am guessing Yarn will still see the new hosts and new flink jobs can make use it, is that right? Thanks Ankit From: "Jain, Ankit" <ankit.j...@here.com> Date: Monday, May 8, 2017 at 9:09 AM To: Stephan Ewen <se...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: High Availability on Yarn Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully have it started through a cloudformation script as part of EMR startup. Is Zk also used to keep track of checkpoint metadata and the execution graph of the running job to recover from ApplicationMaster failure as Aljoscha was guessing below or only for leader election in case of accidently running multiple Application Masters ? Thanks Ankit From: Stephan Ewen <se...@apache.org> Date: Monday, May 8, 2017 at 9:00 AM To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" <ankit.j...@here.com> Subject: Re: High Availability on Yarn @Ankit: ZooKeeper is required in YARN setups still. Even if there is only one JobManager in the normal case, Yarn can accidentally create a second one when there is a network partition. To prevent that this leads to inconsistencies, we use ZooKeeper. Flink uses ZooKeeper very little, so you can just let Flink attach to any existing ZooKeeper, or user one ZooKeeper cluster for very many Flink clusters/jobs. Stephan On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: Hi, Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters. Best, Aljoscha On 5. May 2017, at 16:56, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for the update Aljoscha. @Till Rohrmann<mailto:trohrm...@apache.org>, Can you please chim in? Also, we currently have a long running EMR cluster where we create one flink cluster per job – can we just choose to install Zookeeper when creating the EMR cluster and use one Zookeeper instance for ALL of flink jobs? Or Recommendation is to have a dedicated Zookeeper instance per flink job? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Thursday, May 4, 2017 at 1:19 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Subject: Re: High Availability on Yarn Hi, Yes, for YARN there is only one running JobManager. As far as I Know, In this case ZooKeeper is only used to keep track of checkpoint metadata and the execution graph of the running job. Such that a restoring JobManager can pick up the data again. I’m not 100 % sure on this, though, so maybe Till can shed some light on this. Best, Aljoscha On 3. May 2017, at 16:58, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for your reply Aljoscha. After building better understanding of Yarn and spending copious amount of time on Flink codebase, I think I now get how Flink & Yarn interact – I plan to document this soon in case it could help somebody starting afresh with Flink-Yarn. Regarding Zookeper, in YARN mode there is only one JobManager running, do we still need leader election? If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM and while restarting, Flink AM will bring back previous running containers. So, where does Zookeeper sit in this setup? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Wednesday, May 3, 2017 at 2:05 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Subject: Re: High Availability on Yarn Hi, As a first comment, the work mentioned in the FLIP-6 doc you lin
Re: High Availability on Yarn
Thanks Stephan – we will go with a central ZooKeeper Instance and hopefully have it started through a cloudformation script as part of EMR startup. Is Zk also used to keep track of checkpoint metadata and the execution graph of the running job to recover from ApplicationMaster failure as Aljoscha was guessing below or only for leader election in case of accidently running multiple Application Masters ? Thanks Ankit From: Stephan Ewen <se...@apache.org> Date: Monday, May 8, 2017 at 9:00 AM To: "user@flink.apache.org" <user@flink.apache.org>, "Jain, Ankit" <ankit.j...@here.com> Subject: Re: High Availability on Yarn @Ankit: ZooKeeper is required in YARN setups still. Even if there is only one JobManager in the normal case, Yarn can accidentally create a second one when there is a network partition. To prevent that this leads to inconsistencies, we use ZooKeeper. Flink uses ZooKeeper very little, so you can just let Flink attach to any existing ZooKeeper, or user one ZooKeeper cluster for very many Flink clusters/jobs. Stephan On Mon, May 8, 2017 at 2:11 PM, Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: Hi, Yes, it’s recommended to use one ZooKeeper cluster for all Flink clusters. Best, Aljoscha On 5. May 2017, at 16:56, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for the update Aljoscha. @Till Rohrmann<mailto:trohrm...@apache.org>, Can you please chim in? Also, we currently have a long running EMR cluster where we create one flink cluster per job – can we just choose to install Zookeeper when creating the EMR cluster and use one Zookeeper instance for ALL of flink jobs? Or Recommendation is to have a dedicated Zookeeper instance per flink job? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Thursday, May 4, 2017 at 1:19 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Subject: Re: High Availability on Yarn Hi, Yes, for YARN there is only one running JobManager. As far as I Know, In this case ZooKeeper is only used to keep track of checkpoint metadata and the execution graph of the running job. Such that a restoring JobManager can pick up the data again. I’m not 100 % sure on this, though, so maybe Till can shed some light on this. Best, Aljoscha On 3. May 2017, at 16:58, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for your reply Aljoscha. After building better understanding of Yarn and spending copious amount of time on Flink codebase, I think I now get how Flink & Yarn interact – I plan to document this soon in case it could help somebody starting afresh with Flink-Yarn. Regarding Zookeper, in YARN mode there is only one JobManager running, do we still need leader election? If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM and while restarting, Flink AM will bring back previous running containers. So, where does Zookeeper sit in this setup? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Wednesday, May 3, 2017 at 2:05 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Subject: Re: High Availability on Yarn Hi, As a first comment, the work mentioned in the FLIP-6 doc you linked is still work-in-progress. You cannot use these abstractions yet without going into the code and setting up a cluster “by hand”. The documentation for one-step deployment of a Job to YARN is available here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.2%2Fsetup%2Fyarn_setup.html%23run-a-single-flink-job-on-yarn=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=fzBiQtv7MR2%2Fehg6GepwPa1uWxpqEgPJakto2B8k0Zk%3D=0> Regarding your third question, ZooKeeper is mostly used for discovery and leader election. That is, JobManagers use it to decide who is the main JM and who are standby JMs. TaskManagers use it to discover the leading JobManager that they should connect to. I’m also cc’ing Till, who should know this stuff better and can maybe explain it in a bit more detail. Best, Aljoscha On 1. May 2017, at 18:59, Jain, Ankit
Re: High Availability on Yarn
Thanks for the update Aljoscha. @Till Rohrmann<mailto:trohrm...@apache.org>, Can you please chim in? Also, we currently have a long running EMR cluster where we create one flink cluster per job – can we just choose to install Zookeeper when creating the EMR cluster and use one Zookeeper instance for ALL of flink jobs? Or Recommendation is to have a dedicated Zookeeper instance per flink job? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org> Date: Thursday, May 4, 2017 at 1:19 AM To: "Jain, Ankit" <ankit.j...@here.com> Cc: "user@flink.apache.org" <user@flink.apache.org>, Till Rohrmann <trohrm...@apache.org> Subject: Re: High Availability on Yarn Hi, Yes, for YARN there is only one running JobManager. As far as I Know, In this case ZooKeeper is only used to keep track of checkpoint metadata and the execution graph of the running job. Such that a restoring JobManager can pick up the data again. I’m not 100 % sure on this, though, so maybe Till can shed some light on this. Best, Aljoscha On 3. May 2017, at 16:58, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Thanks for your reply Aljoscha. After building better understanding of Yarn and spending copious amount of time on Flink codebase, I think I now get how Flink & Yarn interact – I plan to document this soon in case it could help somebody starting afresh with Flink-Yarn. Regarding Zookeper, in YARN mode there is only one JobManager running, do we still need leader election? If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM and while restarting, Flink AM will bring back previous running containers. So, where does Zookeeper sit in this setup? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> Date: Wednesday, May 3, 2017 at 2:05 AM To: "Jain, Ankit" <ankit.j...@here.com<mailto:ankit.j...@here.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>>, Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> Subject: Re: High Availability on Yarn Hi, As a first comment, the work mentioned in the FLIP-6 doc you linked is still work-in-progress. You cannot use these abstractions yet without going into the code and setting up a cluster “by hand”. The documentation for one-step deployment of a Job to YARN is available here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.2%2Fsetup%2Fyarn_setup.html%23run-a-single-flink-job-on-yarn=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=fzBiQtv7MR2%2Fehg6GepwPa1uWxpqEgPJakto2B8k0Zk%3D=0> Regarding your third question, ZooKeeper is mostly used for discovery and leader election. That is, JobManagers use it to decide who is the main JM and who are standby JMs. TaskManagers use it to discover the leading JobManager that they should connect to. I’m also cc’ing Till, who should know this stuff better and can maybe explain it in a bit more detail. Best, Aljoscha On 1. May 2017, at 18:59, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Hi fellow users, We are trying to straighten out high availability story for flink. Our setup includes a long running EMR cluster, job submission is a two-step process – 1) Flink cluster is first created using flink yarn client on the EMR cluster already running 2) Flink job is submitted. I also saw references that with 1.2, these two steps have been combined into 1 – is that change in FlinkYarnSessionCli.java? Can somebody point to documentation please? W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly introduced) failure for now, I want to understand first how task manager & job manager failures are handled. My questions- 1) https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D65147077=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=29Se7mWQZ09ukF3rkQNmSRPXY4RkA8RCNO4ec4Glj8I%3D=0> suggests a new RM has been added and now there is one JobManager for each job. Since Yarn RM will now talk to Flink RM( instead of JobManager previously), will Yarn automatically restart failing Flink RM? 2) Is there any documentation on behavior of new Flink RM that will come up? How will previously running JobManagers & TaskManagers find out about new RM? 3) https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanag
Re: High Availability on Yarn
Thanks for your reply Aljoscha. After building better understanding of Yarn and spending copious amount of time on Flink codebase, I think I now get how Flink & Yarn interact – I plan to document this soon in case it could help somebody starting afresh with Flink-Yarn. Regarding Zookeper, in YARN mode there is only one JobManager running, do we still need leader election? If the ApplicationMaster goes down (where JM runs) it is restarted by Yarn RM and while restarting, Flink AM will bring back previous running containers. So, where does Zookeeper sit in this setup? Thanks Ankit From: Aljoscha Krettek <aljos...@apache.org> Date: Wednesday, May 3, 2017 at 2:05 AM To: "Jain, Ankit" <ankit.j...@here.com> Cc: "user@flink.apache.org" <user@flink.apache.org>, Till Rohrmann <trohrm...@apache.org> Subject: Re: High Availability on Yarn Hi, As a first comment, the work mentioned in the FLIP-6 doc you linked is still work-in-progress. You cannot use these abstractions yet without going into the code and setting up a cluster “by hand”. The documentation for one-step deployment of a Job to YARN is available here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/yarn_setup.html#run-a-single-flink-job-on-yarn<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.2%2Fsetup%2Fyarn_setup.html%23run-a-single-flink-job-on-yarn=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=fzBiQtv7MR2%2Fehg6GepwPa1uWxpqEgPJakto2B8k0Zk%3D=0> Regarding your third question, ZooKeeper is mostly used for discovery and leader election. That is, JobManagers use it to decide who is the main JM and who are standby JMs. TaskManagers use it to discover the leading JobManager that they should connect to. I’m also cc’ing Till, who should know this stuff better and can maybe explain it in a bit more detail. Best, Aljoscha On 1. May 2017, at 18:59, Jain, Ankit <ankit.j...@here.com<mailto:ankit.j...@here.com>> wrote: Hi fellow users, We are trying to straighten out high availability story for flink. Our setup includes a long running EMR cluster, job submission is a two-step process – 1) Flink cluster is first created using flink yarn client on the EMR cluster already running 2) Flink job is submitted. I also saw references that with 1.2, these two steps have been combined into 1 – is that change in FlinkYarnSessionCli.java? Can somebody point to documentation please? W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly introduced) failure for now, I want to understand first how task manager & job manager failures are handled. My questions- 1) https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D65147077=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=29Se7mWQZ09ukF3rkQNmSRPXY4RkA8RCNO4ec4Glj8I%3D=0> suggests a new RM has been added and now there is one JobManager for each job. Since Yarn RM will now talk to Flink RM( instead of JobManager previously), will Yarn automatically restart failing Flink RM? 2) Is there any documentation on behavior of new Flink RM that will come up? How will previously running JobManagers & TaskManagers find out about new RM? 3) https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#configuration<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.3%2Fsetup%2Fjobmanager_high_availability.html%23configuration=01%7C01%7C%7Cfb13823970ba4476ebbf08d49203846c%7C6d4034cd72254f72b85391feaea64919%7C1=nYTYWaaWA4T1D7EwvL%2B7mwhrVcqn6xTzCv8SS6x%2FqLM%3D=0> requires configuring Zookeeper even for Yarn – Is this needed for handling Task Manager failures or JM or both? Will Yarn not take care of JM failures? It may sound like I am little confused between role of Yarn and Flink components– who has the most burden of HA? Documentation in current state is lacking clarity – I know it is still evolving. Please let me know if somebody can help clear the confusion. Thanks Ankit
High Availability on Yarn
Hi fellow users, We are trying to straighten out high availability story for flink. Our setup includes a long running EMR cluster, job submission is a two-step process – 1) Flink cluster is first created using flink yarn client on the EMR cluster already running 2) Flink job is submitted. I also saw references that with 1.2, these two steps have been combined into 1 – is that change in FlinkYarnSessionCli.java? Can somebody point to documentation please? W/o worrying about Yarn RM (not Flink Yarn RM that seems to be newly introduced) failure for now, I want to understand first how task manager & job manager failures are handled. My questions- 1) https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 suggests a new RM has been added and now there is one JobManager for each job. Since Yarn RM will now talk to Flink RM( instead of JobManager previously), will Yarn automatically restart failing Flink RM? 2) Is there any documentation on behavior of new Flink RM that will come up? How will previously running JobManagers & TaskManagers find out about new RM? 3) https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#configuration requires configuring Zookeeper even for Yarn – Is this needed for handling Task Manager failures or JM or both? Will Yarn not take care of JM failures? It may sound like I am little confused between role of Yarn and Flink components– who has the most burden of HA? Documentation in current state is lacking clarity – I know it is still evolving. Please let me know if somebody can help clear the confusion. Thanks Ankit
Re: Cross operation on two huge datasets
information to the point. Then do a groupBy to merge the points that were inside of multiple shapes. This works very well in a local runtime but fails on yarn because it seems that the taskmanager reloads the jar file between two jobs, making me lose my static RTree (I guess that newly loaded class overwrites the older one). I have two questions : - Is there a way to avoid that jar reload // can I store my RTree somewhere in jdk or flink, locally to the taskmanager, in a way that it wouldn’t be affected by the jar reload (since it would not be stored in any class from MY jar)? o I could also try to do it in a single job, but I don’t know how to ensure that some operations are done (parsing of shape) BEFORE starting others handling the points. - Is there a way to do that in a clean way using flink operators ? I’d need to be able to use the result of the iteration of a dataset inside of my map. Something like : datasetA.flatmap(new MyMapOperator(datasetB))… And In my implementation I would be able to iterate the whole datasetB BEFORE doing any operation in datasetA. That way I could parse all my shapes in an RTree before handling my points, without relying on static Or any other way that would allow me to do something similar. Thanks in advance for your insight. Gwen’ From: Jain, Ankit [mailto:ankit.j...@here.com<mailto:ankit.j...@here.com>] Sent: jeudi 23 février 2017 19:21 To: user@flink.apache.org<mailto:user@flink.apache.org> Cc: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> Subject: Re: Cross operation on two huge datasets Hi Gwen, I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region. Thanks Ankit From: Fabian Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>> Date: Wednesday, February 22, 2017 at 2:41 PM To: <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Cross operation on two huge datasets Hi Gwen, Flink usually performs a block nested loop join to cross two data sets. This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records. You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute. For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross. Best, Fabian 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers <gwenhael.pasqui...@ericsson.com<mailto:gwenhael.pasqui...@ericsson.com>>: Hi, I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas). I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area. I tried it and my job stucks seems to work for some seconds then, at some point, it stucks. I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times). Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me ☺ I’m using flink 1.0.1. Thanks in advance Gwen’
Re: Cross operation on two huge datasets
Hi Gwen, I would recommend looking into a data structure called RTree that is designed specifically for this use case, i.e matching point to a region. Thanks Ankit From: Fabian HueskeDate: Wednesday, February 22, 2017 at 2:41 PM To: Subject: Re: Cross operation on two huge datasets Hi Gwen, Flink usually performs a block nested loop join to cross two data sets. This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records are again crossed. This is done until one iteration over the spill records is done. Then the other buffer of the streamed input is filled with the next records. You should be aware that cross is a super expensive operation, especially if you evaluate a complex condition for each pair of records. So cross can be easily too expensive to compute. For such use cases it is usually better to apply a coarse-grained spatial partitioning and do a key-based join on the partitions. Within each partition you'd perform a cross. Best, Fabian 2017-02-21 18:34 GMT+01:00 Gwenhael Pasquiers >: Hi, I need (or at least I think I do) to do a cross operation between two huge datasets. One dataset is a list of points. The other one is a list of shapes (areas). I want to know, for each point, the areas (they might overlap so a point can be in multiple areas) it belongs to so I thought I’d “cross” my points and areas since I need to test each point against each area. I tried it and my job stucks seems to work for some seconds then, at some point, it stucks. I’m wondering if Flink, for cross operations, tries to load one of the two datasets into RAM or if it’s able to split the job in multiple iterations (even if it means reading one of the two datasets multiple times). Or maybe I’m going at it the wrong way, or missing some parameters, feel free to correct me ☺ I’m using flink 1.0.1. Thanks in advance Gwen’