Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Yes, Tathagata, thank you. For #1, the 'need detection', one idea we're entertaining is timestamping the messages coming into the Kafka topics. The consumers would check the interval between the time they get the message and that message origination timestamp. As Kafka topics start to fill up more, we would presumably see longer and longer wait times (delays) for messages to be getting processed by the consumers. The consumers would then start firing off critical events into an Event Analyzer/Aggregator which would decide that more resources are needed, then ask the Provisioning Component to allocate N new machines. We do want to set maxRatePerPartition in order to not overwhelm the consumers and run out of memory. Machine provisioning may take a while, and if left with no maxRate guards, our consumers could run out of memory. "Since there are no receivers, if the cluster gets a new executor, it will automatically start getting used to run tasks... no need to do anything further." This is great, actually. We were wondering whether we'd need to restart the consumers once the new machines have been added. Tathagata's point implies, as I read it, that no further orchestration is needed, the load will start getting redistributed automatically. This makes implementation of autoscaling a lot simpler, as far as #3. One issue that's not yet been covered much is the scenario when *fewer* cluster resources become required (a system load valley rather than a peak). To detect a low volume, we'd need to measure the throughput in messages per second over time. Real low volumes would cause firing off of critical events signaling to the Analyzer that machines could be decommissioned. If machines are being decommissioned, it would seem that the consumers would need to get acquiesced (allowed to process any current batch, then shut down), then they would restart themselves or be restarted. Thoughts on this? There is also a hefty #4 here which is the "hysteresis" of this, where the system operates adaptively and learns over time, remembering the history of cluster expansions and contractions and allowing a certain slack for letting things cool down or heat up more gradually; also not contracting or expanding too frequently. PID controllers and thermostat types of design patterns have been mentioned before in this discussion. On Thu, Jun 11, 2015 at 11:08 PM, Tathagata Das wrote: > Let me try to add some clarity in the different thought directions that's > going on in this thread. > > 1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES? > > If there are not rate limits set up, the most reliable way to detect > whether the current Spark cluster is being insufficient to handle the data > load is to use the StreamingListner interface which gives all the > information about when batches start and end. See the internal > implementation of the StreamingListener called > StreamingJobProgressListener. This is the one that drives the streaming UI. > You can get the scheduling delay (time take for a batch to start > processing) from it and use that as a reliable indicator that Spark > Streaming is not able to process as fast as data is being received. > > But if you have already set rate limits based on the max load that cluster > can handle, then you will probably never detect that the actual input rate > into Kafka has gone up and data is getting buffered inside Kafka. In that > case, you have to monitor kafka load to correctly detect the high load. You > may to use a combination of both techniques for robust and safe elastic > solution - Have rate limits set, use StreamingListener for early detect > that processing load is increasing (can increase without actual increase in > data rate) and also make sure from Kafka monitoring that the whole > end-to-end system is keeping up. > > > 2. HOW TO GET MORE CLUSTER RESOURCES? > > Currently for YARN, you can use the developer API of dynamic allocation > that Andrew Or has introduced to ask for more executors from YARN. Note > that the existing dynamic allocation solution is unlikely to work for > streaming, and should not be used. Rather I recommend building your own > logic that sees the streaming scheduling delay, and accordingly uses the > low level developer API to directly ask for more executors > (sparkContext.requestExecutors). In other approaches, the Provising > Component idea can also work. > > > 3. HOW TO TAKE ADVANTAGE OF MORE CLUSTER RESOURCES? > > There are two approaches depending on receiver vs Kafka direct. I am > assuming the number of topic partitions pre-determined to be large enough > to handle peak load. > > (a) Kafka Direct: This is the simpler scenario. Since there are no > receivers, if the cluster gets a new executor, it will automatically start > getting used to run tasks, including reading from Kafka (remember, Kafka > direct approach reads from Kafka like a file system, from any node that > runs the task). So it will immediately start using t
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Let me try to add some clarity in the different thought directions that's going on in this thread. 1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES? If there are not rate limits set up, the most reliable way to detect whether the current Spark cluster is being insufficient to handle the data load is to use the StreamingListner interface which gives all the information about when batches start and end. See the internal implementation of the StreamingListener called StreamingJobProgressListener. This is the one that drives the streaming UI. You can get the scheduling delay (time take for a batch to start processing) from it and use that as a reliable indicator that Spark Streaming is not able to process as fast as data is being received. But if you have already set rate limits based on the max load that cluster can handle, then you will probably never detect that the actual input rate into Kafka has gone up and data is getting buffered inside Kafka. In that case, you have to monitor kafka load to correctly detect the high load. You may to use a combination of both techniques for robust and safe elastic solution - Have rate limits set, use StreamingListener for early detect that processing load is increasing (can increase without actual increase in data rate) and also make sure from Kafka monitoring that the whole end-to-end system is keeping up. 2. HOW TO GET MORE CLUSTER RESOURCES? Currently for YARN, you can use the developer API of dynamic allocation that Andrew Or has introduced to ask for more executors from YARN. Note that the existing dynamic allocation solution is unlikely to work for streaming, and should not be used. Rather I recommend building your own logic that sees the streaming scheduling delay, and accordingly uses the low level developer API to directly ask for more executors (sparkContext.requestExecutors). In other approaches, the Provising Component idea can also work. 3. HOW TO TAKE ADVANTAGE OF MORE CLUSTER RESOURCES? There are two approaches depending on receiver vs Kafka direct. I am assuming the number of topic partitions pre-determined to be large enough to handle peak load. (a) Kafka Direct: This is the simpler scenario. Since there are no receivers, if the cluster gets a new executor, it will automatically start getting used to run tasks, including reading from Kafka (remember, Kafka direct approach reads from Kafka like a file system, from any node that runs the task). So it will immediately start using the extra resources, no need to do anything further. (b) Receiver: This is definitely tricky. If you dont need to increase the number of receivers, then a new executor will start getting used for computations (shuffles, writing out, etc.), but the parallelism in receiving will not increase. If you need to increase that, then its best to shutdown the context gracefully (so that no data is lost), and a new StreamingContext can be started with more receivers (# receivers <= # executors), and may be more #partitions for shuffles. You have call stop on currently running streaming context, to start a new one. If a context is stopped, any thread stuck in awaitTermniation will get unblocked. Does that clarify things? On Thu, Jun 11, 2015 at 7:30 AM, Cody Koeninger wrote: > Depends on what you're reusing multiple times (if anything). > > Read > http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence > > On Wed, Jun 10, 2015 at 12:18 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> At which point would I call cache()? I just want the runtime to spill to >> disk when necessary without me having to know when the "necessary" is. >> >> >> On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger >> wrote: >> >>> direct stream isn't a receiver, it isn't required to cache data anywhere >>> unless you want it to. >>> >>> If you want it, just call cache. >>> >>> On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg < >>> dgoldenberg...@gmail.com> wrote: >>> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it appears the storage level can be specified in the createStream methods but not createDirectStream... On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov wrote: > You can also try Dynamic Resource Allocation > > > > > https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation > > > > Also re the Feedback Loop for automatic message consumption rate > adjustment – there is a “dumb” solution option – simply set the storage > policy for the DStream RDDs to MEMORY AND DISK – when the memory gets > exhausted spark streaming will resort to keeping new RDDs on disk which > will prevent it from crashing and hence loosing them. Then some memory > will > get freed and it will resort back to RAM and so on and so forth > > > > > > Sent from Samsung Mobile > > Original message -
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Depends on what you're reusing multiple times (if anything). Read http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence On Wed, Jun 10, 2015 at 12:18 AM, Dmitry Goldenberg < dgoldenberg...@gmail.com> wrote: > At which point would I call cache()? I just want the runtime to spill to > disk when necessary without me having to know when the "necessary" is. > > > On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger wrote: > >> direct stream isn't a receiver, it isn't required to cache data anywhere >> unless you want it to. >> >> If you want it, just call cache. >> >> On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg < >> dgoldenberg...@gmail.com> wrote: >> >>> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it >>> appears the storage level can be specified in the createStream methods but >>> not createDirectStream... >>> >>> >>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov >>> wrote: >>> You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition and subsequent crash – described in previous postings submitted by me You can even go one step further and periodically iss
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
If I want to restart my consumers into an updated cluster topology after the cluster has been expanded or contracted, would I need to call stop() on them, then call start() on them, or would I need to instantiate and start new context objects (new JavaStreamingContext(...)) ? I'm thinking of actually acquiescing these streaming consumers but letting them finish their current batch first. Right now I'm doing jssc.start(); jssc.awaitTermination(); Must jssc.close() be called as well, after awaitTermination(), to avoid potentially leaking contexts? I don't see that in things like JavaDirectKafkaWordCount but wondering if that's needed. On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov wrote: > Makes sense especially if you have a cloud with “infinite” resources / > nodes which allows you to double, triple etc in the background/parallel the > resources of the currently running cluster > > > > I was thinking more about the scenario where you have e.g. 100 boxes and > want to / can add e.g. 20 more > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:46 PM > *To:* Evo Eftimov > *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Evo, > > > > One of the ideas is to shadow the current cluster. This way there's no > extra latency incurred due to shutting down of the consumers. If two sets > of consumers are running, potentially processing the same data, that is OK. > We phase out the older cluster and gradually flip over to the new one, > insuring no downtime or extra latency. Thoughts? > > > > On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov > wrote: > > You should monitor vital performance / job clogging stats of the Spark > Streaming Runtime not “kafka topics” > > > > You should be able to bring new worker nodes online and make them contact > and register with the Master without bringing down the Master (or any of > the currently running worker nodes) > > > > Then just shutdown your currently running spark streaming job/app and > restart it with new params to take advantage of the larger cluster > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:14 PM > *To:* Cody Koeninger > *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Would it be possible to implement Spark autoscaling somewhat along these > lines? -- > > > > 1. If we sense that a new machine is needed, by watching the data load in > Kafka topic(s), then > > 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS > and get a machine); > > 3. Create a "shadow"/mirror Spark master running alongside the initial > version which talks to N machines. The new mirror version is aware of N+1 > machines (or N+M if we had decided we needed M new boxes). > > 4. The previous version of the Spark runtime is > acquiesced/decommissioned. We possibly get both clusters working on the > same data which may actually be OK (at least for our specific use-cases). > > 5. Now the new Spark cluster is running. > > > > Similarly, the decommissioning of M unused boxes would happen, via this > notion of a mirror Spark runtime. How feasible would it be for such a > mirrorlike setup to be created, especially created programmatically? > Especially point #3. > > > > The other idea we'd entertained was to bring in a new machine, acquiesce > down all currently running workers by telling them to process their current > batch then shut down, then restart the consumers now that Spark is aware of > a modified cluster. This has the drawback of a downtime that may not be > tolerable in terms of latency, by the system's clients waiting for their > responses in a synchronous fashion. > > > > Thanks. > > > > On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger > wrote: > > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: > > Hi all, > > > > As the author of the dynamic allocation feature I can offer a few insights > here. > > > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > This is because of two things:
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
At which point would I call cache()? I just want the runtime to spill to disk when necessary without me having to know when the "necessary" is. On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger wrote: > direct stream isn't a receiver, it isn't required to cache data anywhere > unless you want it to. > > If you want it, just call cache. > > On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg < > dgoldenberg...@gmail.com> wrote: > >> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it >> appears the storage level can be specified in the createStream methods but >> not createDirectStream... >> >> >> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov >> wrote: >> >>> You can also try Dynamic Resource Allocation >>> >>> >>> >>> >>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation >>> >>> >>> >>> Also re the Feedback Loop for automatic message consumption rate >>> adjustment – there is a “dumb” solution option – simply set the storage >>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets >>> exhausted spark streaming will resort to keeping new RDDs on disk which >>> will prevent it from crashing and hence loosing them. Then some memory will >>> get freed and it will resort back to RAM and so on and so forth >>> >>> >>> >>> >>> >>> Sent from Samsung Mobile >>> >>> Original message >>> >>> From: Evo Eftimov >>> >>> Date:2015/05/28 13:22 (GMT+00:00) >>> >>> To: Dmitry Goldenberg >>> >>> Cc: Gerard Maas ,spark users >>> >>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >>> growth in Kafka or Spark's metrics? >>> >>> >>> >>> You can always spin new boxes in the background and bring them into the >>> cluster fold when fully operational and time that with job relaunch and >>> param change >>> >>> >>> >>> Kafka offsets are mabaged automatically for you by the kafka clients >>> which keep them in zoomeeper dont worry about that ad long as you shut down >>> your job gracefuly. Besides msnaging the offsets explicitly is not a big >>> deal if necessary >>> >>> >>> >>> >>> >>> Sent from Samsung Mobile >>> >>> >>> >>> Original message >>> >>> From: Dmitry Goldenberg >>> >>> Date:2015/05/28 13:16 (GMT+00:00) >>> >>> To: Evo Eftimov >>> >>> Cc: Gerard Maas ,spark users >>> >>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >>> growth in Kafka or Spark's metrics? >>> >>> >>> >>> Thanks, Evo. Per the last part of your comment, it sounds like we will >>> need to implement a job manager which will be in control of starting the >>> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and >>> marking them as ones to relaunch, scaling the cluster up/down by >>> adding/removing machines, and relaunching the 'suspended' (shut down) jobs. >>> >>> >>> >>> I suspect that relaunching the jobs may be tricky since that means >>> keeping track of the starter offsets in Kafka topic(s) from which the jobs >>> started working on. >>> >>> >>> >>> Ideally, we'd want to avoid a re-launch. The 'suspension' and >>> relaunching of jobs, coupled with the wait for the new machines to come >>> online may turn out quite time-consuming which will make for lengthy >>> request times, and our requests are not asynchronous. Ideally, the >>> currently running jobs would continue to run on the machines currently >>> available in the cluster. >>> >>> >>> >>> In the scale-down case, the job manager would want to signal to Spark's >>> job scheduler not to send work to the node being taken out, find out when >>> the last job has finished running on the node, then take the node out. >>> >>> >>> >>> This is somewhat like changing the number of cylinders in a car engine >>> while the car is running... >>> >>> >>> >>> Sounds like a great candidate for a set of enhancements in Spark... >>> >>> >>> >>> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov >>> wrote: >>> >>> @DG; The key metrics should be >>> >>> >>> >>> - Scheduling delay – its ideal state is to remain constant >>> over time and ideally be less than the time of the microbatch window >>> >>> - The average job processing time should remain less than the >>> micro-batch window >>> >>> - Number of Lost Jobs – even if there is a single Job lost >>> that means that you have lost all messages for the DStream RDD processed by >>> that job due to the previously described spark streaming memory leak >>> condition and subsequent crash – described in previous postings submitted >>> by me >>> >>> >>> >>> You can even go one step further and periodically issue “get/check free >>> memory” to see whether it is decreasing relentlessly at a constant rate – >>> if it touches a predetermined RAM threshold that should be your third >>> metric >>> >>> >>> >>> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and >>> you can implement one on your own without waiting for Jiras and new >>> features whenever they might be implemen
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
direct stream isn't a receiver, it isn't required to cache data anywhere unless you want it to. If you want it, just call cache. On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg wrote: > "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it > appears the storage level can be specified in the createStream methods but > not createDirectStream... > > > On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov > wrote: > >> You can also try Dynamic Resource Allocation >> >> >> >> >> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation >> >> >> >> Also re the Feedback Loop for automatic message consumption rate >> adjustment – there is a “dumb” solution option – simply set the storage >> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets >> exhausted spark streaming will resort to keeping new RDDs on disk which >> will prevent it from crashing and hence loosing them. Then some memory will >> get freed and it will resort back to RAM and so on and so forth >> >> >> >> >> >> Sent from Samsung Mobile >> >> Original message >> >> From: Evo Eftimov >> >> Date:2015/05/28 13:22 (GMT+00:00) >> >> To: Dmitry Goldenberg >> >> Cc: Gerard Maas ,spark users >> >> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >> growth in Kafka or Spark's metrics? >> >> >> >> You can always spin new boxes in the background and bring them into the >> cluster fold when fully operational and time that with job relaunch and >> param change >> >> >> >> Kafka offsets are mabaged automatically for you by the kafka clients >> which keep them in zoomeeper dont worry about that ad long as you shut down >> your job gracefuly. Besides msnaging the offsets explicitly is not a big >> deal if necessary >> >> >> >> >> >> Sent from Samsung Mobile >> >> >> >> Original message >> >> From: Dmitry Goldenberg >> >> Date:2015/05/28 13:16 (GMT+00:00) >> >> To: Evo Eftimov >> >> Cc: Gerard Maas ,spark users >> >> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >> growth in Kafka or Spark's metrics? >> >> >> >> Thanks, Evo. Per the last part of your comment, it sounds like we will >> need to implement a job manager which will be in control of starting the >> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and >> marking them as ones to relaunch, scaling the cluster up/down by >> adding/removing machines, and relaunching the 'suspended' (shut down) jobs. >> >> >> >> I suspect that relaunching the jobs may be tricky since that means >> keeping track of the starter offsets in Kafka topic(s) from which the jobs >> started working on. >> >> >> >> Ideally, we'd want to avoid a re-launch. The 'suspension' and >> relaunching of jobs, coupled with the wait for the new machines to come >> online may turn out quite time-consuming which will make for lengthy >> request times, and our requests are not asynchronous. Ideally, the >> currently running jobs would continue to run on the machines currently >> available in the cluster. >> >> >> >> In the scale-down case, the job manager would want to signal to Spark's >> job scheduler not to send work to the node being taken out, find out when >> the last job has finished running on the node, then take the node out. >> >> >> >> This is somewhat like changing the number of cylinders in a car engine >> while the car is running... >> >> >> >> Sounds like a great candidate for a set of enhancements in Spark... >> >> >> >> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov >> wrote: >> >> @DG; The key metrics should be >> >> >> >> - Scheduling delay – its ideal state is to remain constant over >> time and ideally be less than the time of the microbatch window >> >> - The average job processing time should remain less than the >> micro-batch window >> >> - Number of Lost Jobs – even if there is a single Job lost that >> means that you have lost all messages for the DStream RDD processed by that >> job due to the previously described spark streaming memory leak condition >> and subsequent crash – described in previous postings submitted by me >> >> >> >> You can even go one step further and periodically issue “get/check free >> memory” to see whether it is decreasing relentlessly at a constant rate – >> if it touches a predetermined RAM threshold that should be your third >> metric >> >> >> >> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and >> you can implement one on your own without waiting for Jiras and new >> features whenever they might be implemented by the Spark dev team – >> moreover you can avoid using slow mechanisms such as ZooKeeper and even >> incorporate some Machine Learning in your Feedback Loop to make it handle >> the message consumption rate more intelligently and benefit from ongoing >> online learning – BUT this is STILL about voluntarily sacrificing your >> performance in the name of keeping your system stable – it is
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
"set the storage policy for the DStream RDDs to MEMORY AND DISK" - it appears the storage level can be specified in the createStream methods but not createDirectStream... On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov wrote: > You can also try Dynamic Resource Allocation > > > > > https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation > > > > Also re the Feedback Loop for automatic message consumption rate > adjustment – there is a “dumb” solution option – simply set the storage > policy for the DStream RDDs to MEMORY AND DISK – when the memory gets > exhausted spark streaming will resort to keeping new RDDs on disk which > will prevent it from crashing and hence loosing them. Then some memory will > get freed and it will resort back to RAM and so on and so forth > > > > > > Sent from Samsung Mobile > > Original message > > From: Evo Eftimov > > Date:2015/05/28 13:22 (GMT+00:00) > > To: Dmitry Goldenberg > > Cc: Gerard Maas ,spark users > > Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth > in Kafka or Spark's metrics? > > > > You can always spin new boxes in the background and bring them into the > cluster fold when fully operational and time that with job relaunch and > param change > > > > Kafka offsets are mabaged automatically for you by the kafka clients which > keep them in zoomeeper dont worry about that ad long as you shut down your > job gracefuly. Besides msnaging the offsets explicitly is not a big deal if > necessary > > > > > > Sent from Samsung Mobile > > > > Original message > > From: Dmitry Goldenberg > > Date:2015/05/28 13:16 (GMT+00:00) > > To: Evo Eftimov > > Cc: Gerard Maas ,spark users > > Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth > in Kafka or Spark's metrics? > > > > Thanks, Evo. Per the last part of your comment, it sounds like we will > need to implement a job manager which will be in control of starting the > jobs, monitoring the status of the Kafka topic(s), shutting jobs down and > marking them as ones to relaunch, scaling the cluster up/down by > adding/removing machines, and relaunching the 'suspended' (shut down) jobs. > > > > I suspect that relaunching the jobs may be tricky since that means keeping > track of the starter offsets in Kafka topic(s) from which the jobs started > working on. > > > > Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching > of jobs, coupled with the wait for the new machines to come online may turn > out quite time-consuming which will make for lengthy request times, and our > requests are not asynchronous. Ideally, the currently running jobs would > continue to run on the machines currently available in the cluster. > > > > In the scale-down case, the job manager would want to signal to Spark's > job scheduler not to send work to the node being taken out, find out when > the last job has finished running on the node, then take the node out. > > > > This is somewhat like changing the number of cylinders in a car engine > while the car is running... > > > > Sounds like a great candidate for a set of enhancements in Spark... > > > > On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov > wrote: > > @DG; The key metrics should be > > > > - Scheduling delay – its ideal state is to remain constant over > time and ideally be less than the time of the microbatch window > > - The average job processing time should remain less than the > micro-batch window > > - Number of Lost Jobs – even if there is a single Job lost that > means that you have lost all messages for the DStream RDD processed by that > job due to the previously described spark streaming memory leak condition > and subsequent crash – described in previous postings submitted by me > > > > You can even go one step further and periodically issue “get/check free > memory” to see whether it is decreasing relentlessly at a constant rate – > if it touches a predetermined RAM threshold that should be your third > metric > > > > Re the “back pressure” mechanism – this is a Feedback Loop mechanism and > you can implement one on your own without waiting for Jiras and new > features whenever they might be implemented by the Spark dev team – > moreover you can avoid using slow mechanisms such as ZooKeeper and even > incorporate some Machine Learning in your Feedback Loop to make it handle > the message consumption rate more intelligently and benefit from ongoing > online learning – BUT this is STILL about voluntarily sacrificing your > performance in the name of keeping your system stable – it is not about > scaling your system/solution > > > > In terms of how to scale the Spark Framework Dynamically – even though > this is not supported at the moment out of the box I guess you can have a > sys management framework spin dynamically a few more boxes (spark worker > nodes), stop dynamically your currently running Spark Streaming Job, > rela
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
I think what we'd want to do is track the ingestion rate in the consumer(s) via Spark's aggregation functions and such. If we're at a critical level (load too high / load too low) then we issue a request into our Provisioning Component to add/remove machines. Once it comes back with an "OK", each consumer can finish its current batch, then terminate itself, and restart with a new context. The new context would be aware of the updated cluster - correct? Therefore the refreshed consumer would restart on the updated cluster. Could we even terminate the consumer immediately upon sensing a critical event? When it would restart, could it resume right where it left off? On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov wrote: > Makes sense especially if you have a cloud with “infinite” resources / > nodes which allows you to double, triple etc in the background/parallel the > resources of the currently running cluster > > > > I was thinking more about the scenario where you have e.g. 100 boxes and > want to / can add e.g. 20 more > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:46 PM > *To:* Evo Eftimov > *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Evo, > > > > One of the ideas is to shadow the current cluster. This way there's no > extra latency incurred due to shutting down of the consumers. If two sets > of consumers are running, potentially processing the same data, that is OK. > We phase out the older cluster and gradually flip over to the new one, > insuring no downtime or extra latency. Thoughts? > > > > On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov > wrote: > > You should monitor vital performance / job clogging stats of the Spark > Streaming Runtime not “kafka topics” > > > > You should be able to bring new worker nodes online and make them contact > and register with the Master without bringing down the Master (or any of > the currently running worker nodes) > > > > Then just shutdown your currently running spark streaming job/app and > restart it with new params to take advantage of the larger cluster > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:14 PM > *To:* Cody Koeninger > *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Would it be possible to implement Spark autoscaling somewhat along these > lines? -- > > > > 1. If we sense that a new machine is needed, by watching the data load in > Kafka topic(s), then > > 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS > and get a machine); > > 3. Create a "shadow"/mirror Spark master running alongside the initial > version which talks to N machines. The new mirror version is aware of N+1 > machines (or N+M if we had decided we needed M new boxes). > > 4. The previous version of the Spark runtime is > acquiesced/decommissioned. We possibly get both clusters working on the > same data which may actually be OK (at least for our specific use-cases). > > 5. Now the new Spark cluster is running. > > > > Similarly, the decommissioning of M unused boxes would happen, via this > notion of a mirror Spark runtime. How feasible would it be for such a > mirrorlike setup to be created, especially created programmatically? > Especially point #3. > > > > The other idea we'd entertained was to bring in a new machine, acquiesce > down all currently running workers by telling them to process their current > batch then shut down, then restart the consumers now that Spark is aware of > a modified cluster. This has the drawback of a downtime that may not be > tolerable in terms of latency, by the system's clients waiting for their > responses in a synchronous fashion. > > > > Thanks. > > > > On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger > wrote: > > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: > > Hi all, > > > > As the author of the dynamic allocation feature I can offer a few insights > here. > > > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > Thi
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
If we have a hand-off between the older consumer and the newer consumer, I wonder if we need to manually manage the offsets in Kafka so as not to miss some messages as the hand-off is happening. Or if we let the new consumer run for a bit then let the old consumer know the 'new guy is in town' then the old consumer can be shut off. Some overlap is OK... On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov wrote: > Makes sense especially if you have a cloud with “infinite” resources / > nodes which allows you to double, triple etc in the background/parallel the > resources of the currently running cluster > > > > I was thinking more about the scenario where you have e.g. 100 boxes and > want to / can add e.g. 20 more > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:46 PM > *To:* Evo Eftimov > *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Evo, > > > > One of the ideas is to shadow the current cluster. This way there's no > extra latency incurred due to shutting down of the consumers. If two sets > of consumers are running, potentially processing the same data, that is OK. > We phase out the older cluster and gradually flip over to the new one, > insuring no downtime or extra latency. Thoughts? > > > > On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov > wrote: > > You should monitor vital performance / job clogging stats of the Spark > Streaming Runtime not “kafka topics” > > > > You should be able to bring new worker nodes online and make them contact > and register with the Master without bringing down the Master (or any of > the currently running worker nodes) > > > > Then just shutdown your currently running spark streaming job/app and > restart it with new params to take advantage of the larger cluster > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:14 PM > *To:* Cody Koeninger > *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Would it be possible to implement Spark autoscaling somewhat along these > lines? -- > > > > 1. If we sense that a new machine is needed, by watching the data load in > Kafka topic(s), then > > 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS > and get a machine); > > 3. Create a "shadow"/mirror Spark master running alongside the initial > version which talks to N machines. The new mirror version is aware of N+1 > machines (or N+M if we had decided we needed M new boxes). > > 4. The previous version of the Spark runtime is > acquiesced/decommissioned. We possibly get both clusters working on the > same data which may actually be OK (at least for our specific use-cases). > > 5. Now the new Spark cluster is running. > > > > Similarly, the decommissioning of M unused boxes would happen, via this > notion of a mirror Spark runtime. How feasible would it be for such a > mirrorlike setup to be created, especially created programmatically? > Especially point #3. > > > > The other idea we'd entertained was to bring in a new machine, acquiesce > down all currently running workers by telling them to process their current > batch then shut down, then restart the consumers now that Spark is aware of > a modified cluster. This has the drawback of a downtime that may not be > tolerable in terms of latency, by the system's clients waiting for their > responses in a synchronous fashion. > > > > Thanks. > > > > On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger > wrote: > > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: > > Hi all, > > > > As the author of the dynamic allocation feature I can offer a few insights > here. > > > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > This is because of two things: > > > > (1) Number of receivers is necessarily fixed, and these are started in > executors. Since we need a receiver for each InputDStream, if we kill these > receivers we essentially stop the stream, which is not what we want. It > makes little sense to close and rest
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Great. "You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” -- anything specific you were thinking of? On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov wrote: > Makes sense especially if you have a cloud with “infinite” resources / > nodes which allows you to double, triple etc in the background/parallel the > resources of the currently running cluster > > > > I was thinking more about the scenario where you have e.g. 100 boxes and > want to / can add e.g. 20 more > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:46 PM > *To:* Evo Eftimov > *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Evo, > > > > One of the ideas is to shadow the current cluster. This way there's no > extra latency incurred due to shutting down of the consumers. If two sets > of consumers are running, potentially processing the same data, that is OK. > We phase out the older cluster and gradually flip over to the new one, > insuring no downtime or extra latency. Thoughts? > > > > On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov > wrote: > > You should monitor vital performance / job clogging stats of the Spark > Streaming Runtime not “kafka topics” > > > > You should be able to bring new worker nodes online and make them contact > and register with the Master without bringing down the Master (or any of > the currently running worker nodes) > > > > Then just shutdown your currently running spark streaming job/app and > restart it with new params to take advantage of the larger cluster > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:14 PM > *To:* Cody Koeninger > *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Would it be possible to implement Spark autoscaling somewhat along these > lines? -- > > > > 1. If we sense that a new machine is needed, by watching the data load in > Kafka topic(s), then > > 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS > and get a machine); > > 3. Create a "shadow"/mirror Spark master running alongside the initial > version which talks to N machines. The new mirror version is aware of N+1 > machines (or N+M if we had decided we needed M new boxes). > > 4. The previous version of the Spark runtime is > acquiesced/decommissioned. We possibly get both clusters working on the > same data which may actually be OK (at least for our specific use-cases). > > 5. Now the new Spark cluster is running. > > > > Similarly, the decommissioning of M unused boxes would happen, via this > notion of a mirror Spark runtime. How feasible would it be for such a > mirrorlike setup to be created, especially created programmatically? > Especially point #3. > > > > The other idea we'd entertained was to bring in a new machine, acquiesce > down all currently running workers by telling them to process their current > batch then shut down, then restart the consumers now that Spark is aware of > a modified cluster. This has the drawback of a downtime that may not be > tolerable in terms of latency, by the system's clients waiting for their > responses in a synchronous fashion. > > > > Thanks. > > > > On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger > wrote: > > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: > > Hi all, > > > > As the author of the dynamic allocation feature I can offer a few insights > here. > > > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > This is because of two things: > > > > (1) Number of receivers is necessarily fixed, and these are started in > executors. Since we need a receiver for each InputDStream, if we kill these > receivers we essentially stop the stream, which is not what we want. It > makes little sense to close and restart a stream the same way we kill and > relaunch executors. > > > > (2) Records come in every batch, and when there is data to process your > executors are not idle. If your idle timeout is less than th
RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Makes sense especially if you have a cloud with “infinite” resources / nodes which allows you to double, triple etc in the background/parallel the resources of the currently running cluster I was thinking more about the scenario where you have e.g. 100 boxes and want to / can add e.g. 20 more From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 4:46 PM To: Evo Eftimov Cc: Cody Koeninger; Andrew Or; Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, One of the ideas is to shadow the current cluster. This way there's no extra latency incurred due to shutting down of the consumers. If two sets of consumers are running, potentially processing the same data, that is OK. We phase out the older cluster and gradually flip over to the new one, insuring no downtime or extra latency. Thoughts? On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov wrote: You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” You should be able to bring new worker nodes online and make them contact and register with the Master without bringing down the Master (or any of the currently running worker nodes) Then just shutdown your currently running spark streaming job/app and restart it with new params to take advantage of the larger cluster From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 4:14 PM To: Cody Koeninger Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a "shadow"/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a few ideas that I will not detail here, but we are looking into this and do intend to support it in the near future. -Andrew 2015-05-28 8:02 GMT-07:00 Evo Eftimov : Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it will be
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Evo, One of the ideas is to shadow the current cluster. This way there's no extra latency incurred due to shutting down of the consumers. If two sets of consumers are running, potentially processing the same data, that is OK. We phase out the older cluster and gradually flip over to the new one, insuring no downtime or extra latency. Thoughts? On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov wrote: > You should monitor vital performance / job clogging stats of the Spark > Streaming Runtime not “kafka topics” > > > > You should be able to bring new worker nodes online and make them contact > and register with the Master without bringing down the Master (or any of > the currently running worker nodes) > > > > Then just shutdown your currently running spark streaming job/app and > restart it with new params to take advantage of the larger cluster > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:14 PM > *To:* Cody Koeninger > *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Would it be possible to implement Spark autoscaling somewhat along these > lines? -- > > > > 1. If we sense that a new machine is needed, by watching the data load in > Kafka topic(s), then > > 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS > and get a machine); > > 3. Create a "shadow"/mirror Spark master running alongside the initial > version which talks to N machines. The new mirror version is aware of N+1 > machines (or N+M if we had decided we needed M new boxes). > > 4. The previous version of the Spark runtime is > acquiesced/decommissioned. We possibly get both clusters working on the > same data which may actually be OK (at least for our specific use-cases). > > 5. Now the new Spark cluster is running. > > > > Similarly, the decommissioning of M unused boxes would happen, via this > notion of a mirror Spark runtime. How feasible would it be for such a > mirrorlike setup to be created, especially created programmatically? > Especially point #3. > > > > The other idea we'd entertained was to bring in a new machine, acquiesce > down all currently running workers by telling them to process their current > batch then shut down, then restart the consumers now that Spark is aware of > a modified cluster. This has the drawback of a downtime that may not be > tolerable in terms of latency, by the system's clients waiting for their > responses in a synchronous fashion. > > > > Thanks. > > > > On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger > wrote: > > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: > > Hi all, > > > > As the author of the dynamic allocation feature I can offer a few insights > here. > > > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > This is because of two things: > > > > (1) Number of receivers is necessarily fixed, and these are started in > executors. Since we need a receiver for each InputDStream, if we kill these > receivers we essentially stop the stream, which is not what we want. It > makes little sense to close and restart a stream the same way we kill and > relaunch executors. > > > > (2) Records come in every batch, and when there is data to process your > executors are not idle. If your idle timeout is less than the batch > duration, then you'll end up having to constantly kill and restart > executors. If your idle timeout is greater than the batch duration, then > you'll never kill executors. > > > > Long answer short, with Spark streaming there is currently no > straightforward way to scale the size of your cluster. I had a long > discussion with TD (Spark streaming lead) about what needs to be done to > provide some semblance of dynamic scaling to streaming applications, e.g. > take into account the batch queue instead. We came up with a few ideas that > I will not detail here, but we are looking into this and do intend to > support it in the near future. > > > > -Andrew > > > > > > > > 2015-05-28 8:02 GMT-07:00 Evo Eftimov : > > > > Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK > – it will be your insurance policy against sys crashes due to m
RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” You should be able to bring new worker nodes online and make them contact and register with the Master without bringing down the Master (or any of the currently running worker nodes) Then just shutdown your currently running spark streaming job/app and restart it with new params to take advantage of the larger cluster From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Wednesday, June 3, 2015 4:14 PM To: Cody Koeninger Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a "shadow"/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger wrote: I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a few ideas that I will not detail here, but we are looking into this and do intend to support it in the near future. -Andrew 2015-05-28 8:02 GMT-07:00 Evo Eftimov : Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it will be your insurance policy against sys crashes due to memory leaks. Until there is free RAM, spark streaming (spark) will NOT resort to disk – and of course resorting to disk from time to time (ie when there is no free RAM ) and taking a performance hit from that, BUT only until there is no free RAM From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Thursday, May 28, 2015 2:34 PM To: Evo Eftimov Cc: Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk wil
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Would it be possible to implement Spark autoscaling somewhat along these lines? -- 1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a "shadow"/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger wrote: > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: > >> Hi all, >> >> As the author of the dynamic allocation feature I can offer a few >> insights here. >> >> Gerard's explanation was both correct and concise: dynamic allocation is >> not intended to be used in Spark streaming at the moment (1.4 or before). >> This is because of two things: >> >> (1) Number of receivers is necessarily fixed, and these are started in >> executors. Since we need a receiver for each InputDStream, if we kill these >> receivers we essentially stop the stream, which is not what we want. It >> makes little sense to close and restart a stream the same way we kill and >> relaunch executors. >> >> (2) Records come in every batch, and when there is data to process your >> executors are not idle. If your idle timeout is less than the batch >> duration, then you'll end up having to constantly kill and restart >> executors. If your idle timeout is greater than the batch duration, then >> you'll never kill executors. >> >> Long answer short, with Spark streaming there is currently no >> straightforward way to scale the size of your cluster. I had a long >> discussion with TD (Spark streaming lead) about what needs to be done to >> provide some semblance of dynamic scaling to streaming applications, e.g. >> take into account the batch queue instead. We came up with a few ideas that >> I will not detail here, but we are looking into this and do intend to >> support it in the near future. >> >> -Andrew >> >> >> >> 2015-05-28 8:02 GMT-07:00 Evo Eftimov : >> >> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK >>> – it will be your insurance policy against sys crashes due to memory leaks. >>> Until there is free RAM, spark streaming (spark) will NOT resort to disk – >>> and of course resorting to disk from time to time (ie when there is no free >>> RAM ) and taking a performance hit from that, BUT only until there is no >>> free RAM >>> >>> >>> >>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] >>> *Sent:* Thursday, May 28, 2015 2:34 PM >>> *To:* Evo Eftimov >>> *Cc:* Gerard Maas; spark users >>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic >>> sizes/rate of growth in Kafka or Spark's metrics? >>> >>> >>> >>> Evo, good points. >>> >>> >>> >>> On the dynamic resource allocation, I'm surmising this only works within >>> a particular cluster setup. So it improves the usage of current cluster >>> resources but it doesn't make the cluster itself elastic. At least, that's >>> my understanding. >>> >>> >>> >>> Memory + disk would be good and hopefully it'd take *huge* load on the >>> system to start exhausting the disk space too. I'd guess that falling onto >>> disk will make things significantly slower due to the extra I/O. >>> >>> >>&
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Which would imply that if there was a load manager type of service, it could signal to the driver(s) that they need to acquiesce, i.e. process what's at hand and terminate. Then bring up a new machine, then restart the driver(s)... Same deal with removing machines from the cluster. Send a signal for the drivers to pipe down and terminate, then restart them. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger wrote: > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: > >> Hi all, >> >> As the author of the dynamic allocation feature I can offer a few >> insights here. >> >> Gerard's explanation was both correct and concise: dynamic allocation is >> not intended to be used in Spark streaming at the moment (1.4 or before). >> This is because of two things: >> >> (1) Number of receivers is necessarily fixed, and these are started in >> executors. Since we need a receiver for each InputDStream, if we kill these >> receivers we essentially stop the stream, which is not what we want. It >> makes little sense to close and restart a stream the same way we kill and >> relaunch executors. >> >> (2) Records come in every batch, and when there is data to process your >> executors are not idle. If your idle timeout is less than the batch >> duration, then you'll end up having to constantly kill and restart >> executors. If your idle timeout is greater than the batch duration, then >> you'll never kill executors. >> >> Long answer short, with Spark streaming there is currently no >> straightforward way to scale the size of your cluster. I had a long >> discussion with TD (Spark streaming lead) about what needs to be done to >> provide some semblance of dynamic scaling to streaming applications, e.g. >> take into account the batch queue instead. We came up with a few ideas that >> I will not detail here, but we are looking into this and do intend to >> support it in the near future. >> >> -Andrew >> >> >> >> 2015-05-28 8:02 GMT-07:00 Evo Eftimov : >> >> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK >>> – it will be your insurance policy against sys crashes due to memory leaks. >>> Until there is free RAM, spark streaming (spark) will NOT resort to disk – >>> and of course resorting to disk from time to time (ie when there is no free >>> RAM ) and taking a performance hit from that, BUT only until there is no >>> free RAM >>> >>> >>> >>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] >>> *Sent:* Thursday, May 28, 2015 2:34 PM >>> *To:* Evo Eftimov >>> *Cc:* Gerard Maas; spark users >>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic >>> sizes/rate of growth in Kafka or Spark's metrics? >>> >>> >>> >>> Evo, good points. >>> >>> >>> >>> On the dynamic resource allocation, I'm surmising this only works within >>> a particular cluster setup. So it improves the usage of current cluster >>> resources but it doesn't make the cluster itself elastic. At least, that's >>> my understanding. >>> >>> >>> >>> Memory + disk would be good and hopefully it'd take *huge* load on the >>> system to start exhausting the disk space too. I'd guess that falling onto >>> disk will make things significantly slower due to the extra I/O. >>> >>> >>> >>> Perhaps we'll really want all of these elements eventually. I think >>> we'd want to start with memory only, keeping maxRate low enough not to >>> overwhelm the consumers; implement the cluster autoscaling. We might >>> experiment with dynamic resource allocation before we get to implement the >>> cluster autoscale. >>> >>> >>> >>> >>> >>> >>> >>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov >>> wrote: >>> >>> You can also try Dynamic Resource Allocation >>> >>> >>> >>> >>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation >>> >>> >>> >>> Also re the Feedback Loop for automatic message consumption rate >>> adjustment – there is a “dumb” solution option – simply set the storage >>> policy for the DStream RDDs t
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
I'm not sure that points 1 and 2 really apply to the kafka direct stream. There are no receivers, and you know at the driver how big each of your batches is. On Thu, May 28, 2015 at 2:21 PM, Andrew Or wrote: > Hi all, > > As the author of the dynamic allocation feature I can offer a few insights > here. > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > This is because of two things: > > (1) Number of receivers is necessarily fixed, and these are started in > executors. Since we need a receiver for each InputDStream, if we kill these > receivers we essentially stop the stream, which is not what we want. It > makes little sense to close and restart a stream the same way we kill and > relaunch executors. > > (2) Records come in every batch, and when there is data to process your > executors are not idle. If your idle timeout is less than the batch > duration, then you'll end up having to constantly kill and restart > executors. If your idle timeout is greater than the batch duration, then > you'll never kill executors. > > Long answer short, with Spark streaming there is currently no > straightforward way to scale the size of your cluster. I had a long > discussion with TD (Spark streaming lead) about what needs to be done to > provide some semblance of dynamic scaling to streaming applications, e.g. > take into account the batch queue instead. We came up with a few ideas that > I will not detail here, but we are looking into this and do intend to > support it in the near future. > > -Andrew > > > > 2015-05-28 8:02 GMT-07:00 Evo Eftimov : > > Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK >> – it will be your insurance policy against sys crashes due to memory leaks. >> Until there is free RAM, spark streaming (spark) will NOT resort to disk – >> and of course resorting to disk from time to time (ie when there is no free >> RAM ) and taking a performance hit from that, BUT only until there is no >> free RAM >> >> >> >> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] >> *Sent:* Thursday, May 28, 2015 2:34 PM >> *To:* Evo Eftimov >> *Cc:* Gerard Maas; spark users >> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic >> sizes/rate of growth in Kafka or Spark's metrics? >> >> >> >> Evo, good points. >> >> >> >> On the dynamic resource allocation, I'm surmising this only works within >> a particular cluster setup. So it improves the usage of current cluster >> resources but it doesn't make the cluster itself elastic. At least, that's >> my understanding. >> >> >> >> Memory + disk would be good and hopefully it'd take *huge* load on the >> system to start exhausting the disk space too. I'd guess that falling onto >> disk will make things significantly slower due to the extra I/O. >> >> >> >> Perhaps we'll really want all of these elements eventually. I think we'd >> want to start with memory only, keeping maxRate low enough not to overwhelm >> the consumers; implement the cluster autoscaling. We might experiment with >> dynamic resource allocation before we get to implement the cluster >> autoscale. >> >> >> >> >> >> >> >> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov >> wrote: >> >> You can also try Dynamic Resource Allocation >> >> >> >> >> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation >> >> >> >> Also re the Feedback Loop for automatic message consumption rate >> adjustment – there is a “dumb” solution option – simply set the storage >> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets >> exhausted spark streaming will resort to keeping new RDDs on disk which >> will prevent it from crashing and hence loosing them. Then some memory will >> get freed and it will resort back to RAM and so on and so forth >> >> >> >> >> >> Sent from Samsung Mobile >> >> Original message >> >> From: Evo Eftimov >> >> Date:2015/05/28 13:22 (GMT+00:00) >> >> To: Dmitry Goldenberg >> >> Cc: Gerard Maas ,spark users >> >> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >> growth in Kafka or Spark's metrics? >> >> >> >> You can always spin new boxes in the background and bring them into the >&g
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Thanks, Andrew. >From speaking with customers, this is one of the most pressing issues for them (burning hot, to be precise), especially in a SAAS type of environment and especially with commodity hardware at play. Understandably, folks don't want to pay for more hardware usage than necessary and they want to be able to handle the peaks and valleys of usage (especially the peaks) optimally. It looks like there needs to be a generic 'watchdog' type of service which would get metrics/signals from things like Kafka, then call into a (potentially custom) handler which will cause new hardware to be provisioned or decomissioned. Needless to say, both Spark, the watchdog, and the provisioner need to be completely in sync and mindful of currently running Spark jobs so that new hardware immediately picks up extra load and hardware is only decommissioned as any running Spark jobs have been acquiesced... As I learn more about the configuration parameters and dynamic resource allocation, I'm starting to feel that a dashboard with all these knobs exposed would be so useful. Being able to test/simulate load volumes and tweak the knobs as necessary, to arrive at the optimal patterns... Regards, - Dmitry On Thu, May 28, 2015 at 3:21 PM, Andrew Or wrote: > Hi all, > > As the author of the dynamic allocation feature I can offer a few insights > here. > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > This is because of two things: > > (1) Number of receivers is necessarily fixed, and these are started in > executors. Since we need a receiver for each InputDStream, if we kill these > receivers we essentially stop the stream, which is not what we want. It > makes little sense to close and restart a stream the same way we kill and > relaunch executors. > > (2) Records come in every batch, and when there is data to process your > executors are not idle. If your idle timeout is less than the batch > duration, then you'll end up having to constantly kill and restart > executors. If your idle timeout is greater than the batch duration, then > you'll never kill executors. > > Long answer short, with Spark streaming there is currently no > straightforward way to scale the size of your cluster. I had a long > discussion with TD (Spark streaming lead) about what needs to be done to > provide some semblance of dynamic scaling to streaming applications, e.g. > take into account the batch queue instead. We came up with a few ideas that > I will not detail here, but we are looking into this and do intend to > support it in the near future. > > -Andrew > > > > 2015-05-28 8:02 GMT-07:00 Evo Eftimov : > >> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK >> – it will be your insurance policy against sys crashes due to memory leaks. >> Until there is free RAM, spark streaming (spark) will NOT resort to disk – >> and of course resorting to disk from time to time (ie when there is no free >> RAM ) and taking a performance hit from that, BUT only until there is no >> free RAM >> >> >> >> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] >> *Sent:* Thursday, May 28, 2015 2:34 PM >> *To:* Evo Eftimov >> *Cc:* Gerard Maas; spark users >> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic >> sizes/rate of growth in Kafka or Spark's metrics? >> >> >> >> Evo, good points. >> >> >> >> On the dynamic resource allocation, I'm surmising this only works within >> a particular cluster setup. So it improves the usage of current cluster >> resources but it doesn't make the cluster itself elastic. At least, that's >> my understanding. >> >> >> >> Memory + disk would be good and hopefully it'd take *huge* load on the >> system to start exhausting the disk space too. I'd guess that falling onto >> disk will make things significantly slower due to the extra I/O. >> >> >> >> Perhaps we'll really want all of these elements eventually. I think we'd >> want to start with memory only, keeping maxRate low enough not to overwhelm >> the consumers; implement the cluster autoscaling. We might experiment with >> dynamic resource allocation before we get to implement the cluster >> autoscale. >> >> >> >> >> >> >> >> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov >> wrote: >> >> You can also try Dynamic Resource Allocation >> >> >> >> >> https://spark.apache.org/docs/1.3.1/job-scheduling.ht
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Hi all, As the author of the dynamic allocation feature I can offer a few insights here. Gerard's explanation was both correct and concise: dynamic allocation is not intended to be used in Spark streaming at the moment (1.4 or before). This is because of two things: (1) Number of receivers is necessarily fixed, and these are started in executors. Since we need a receiver for each InputDStream, if we kill these receivers we essentially stop the stream, which is not what we want. It makes little sense to close and restart a stream the same way we kill and relaunch executors. (2) Records come in every batch, and when there is data to process your executors are not idle. If your idle timeout is less than the batch duration, then you'll end up having to constantly kill and restart executors. If your idle timeout is greater than the batch duration, then you'll never kill executors. Long answer short, with Spark streaming there is currently no straightforward way to scale the size of your cluster. I had a long discussion with TD (Spark streaming lead) about what needs to be done to provide some semblance of dynamic scaling to streaming applications, e.g. take into account the batch queue instead. We came up with a few ideas that I will not detail here, but we are looking into this and do intend to support it in the near future. -Andrew 2015-05-28 8:02 GMT-07:00 Evo Eftimov : > Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK > – it will be your insurance policy against sys crashes due to memory leaks. > Until there is free RAM, spark streaming (spark) will NOT resort to disk – > and of course resorting to disk from time to time (ie when there is no free > RAM ) and taking a performance hit from that, BUT only until there is no > free RAM > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Thursday, May 28, 2015 2:34 PM > *To:* Evo Eftimov > *Cc:* Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Evo, good points. > > > > On the dynamic resource allocation, I'm surmising this only works within a > particular cluster setup. So it improves the usage of current cluster > resources but it doesn't make the cluster itself elastic. At least, that's > my understanding. > > > > Memory + disk would be good and hopefully it'd take *huge* load on the > system to start exhausting the disk space too. I'd guess that falling onto > disk will make things significantly slower due to the extra I/O. > > > > Perhaps we'll really want all of these elements eventually. I think we'd > want to start with memory only, keeping maxRate low enough not to overwhelm > the consumers; implement the cluster autoscaling. We might experiment with > dynamic resource allocation before we get to implement the cluster > autoscale. > > > > > > > > On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov > wrote: > > You can also try Dynamic Resource Allocation > > > > > https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation > > > > Also re the Feedback Loop for automatic message consumption rate > adjustment – there is a “dumb” solution option – simply set the storage > policy for the DStream RDDs to MEMORY AND DISK – when the memory gets > exhausted spark streaming will resort to keeping new RDDs on disk which > will prevent it from crashing and hence loosing them. Then some memory will > get freed and it will resort back to RAM and so on and so forth > > > > > > Sent from Samsung Mobile > > Original message > > From: Evo Eftimov > > Date:2015/05/28 13:22 (GMT+00:00) > > To: Dmitry Goldenberg > > Cc: Gerard Maas ,spark users > > Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth > in Kafka or Spark's metrics? > > > > You can always spin new boxes in the background and bring them into the > cluster fold when fully operational and time that with job relaunch and > param change > > > > Kafka offsets are mabaged automatically for you by the kafka clients which > keep them in zoomeeper dont worry about that ad long as you shut down your > job gracefuly. Besides msnaging the offsets explicitly is not a big deal if > necessary > > > > > > Sent from Samsung Mobile > > > > Original message > > From: Dmitry Goldenberg > > Date:2015/05/28 13:16 (GMT+00:00) > > To: Evo Eftimov > > Cc: Gerard Maas ,spark users > > Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth > in Kafka or Spark's metr
RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it will be your insurance policy against sys crashes due to memory leaks. Until there is free RAM, spark streaming (spark) will NOT resort to disk – and of course resorting to disk from time to time (ie when there is no free RAM ) and taking a performance hit from that, BUT only until there is no free RAM From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Thursday, May 28, 2015 2:34 PM To: Evo Eftimov Cc: Gerard Maas; spark users Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk will make things significantly slower due to the extra I/O. Perhaps we'll really want all of these elements eventually. I think we'd want to start with memory only, keeping maxRate low enough not to overwhelm the consumers; implement the cluster autoscaling. We might experiment with dynamic resource allocation before we get to implement the cluster autoscale. On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov wrote: You can also try Dynamic Resource Allocation https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation Also re the Feedback Loop for automatic message consumption rate adjustment – there is a “dumb” solution option – simply set the storage policy for the DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark streaming will resort to keeping new RDDs on disk which will prevent it from crashing and hence loosing them. Then some memory will get freed and it will resort back to RAM and so on and so forth Sent from Samsung Mobile Original message From: Evo Eftimov Date:2015/05/28 13:22 (GMT+00:00) To: Dmitry Goldenberg Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? You can always spin new boxes in the background and bring them into the cluster fold when fully operational and time that with job relaunch and param change Kafka offsets are mabaged automatically for you by the kafka clients which keep them in zoomeeper dont worry about that ad long as you shut down your job gracefuly. Besides msnaging the offsets explicitly is not a big deal if necessary Sent from Samsung Mobile Original message From: Dmitry Goldenberg Date:2015/05/28 13:16 (GMT+00:00) To: Evo Eftimov Cc: Gerard Maas ,spark users Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics? Thanks, Evo. Per the last part of your comment, it sounds like we will need to implement a job manager which will be in control of starting the jobs, monitoring the status of the Kafka topic(s), shutting jobs down and marking them as ones to relaunch, scaling the cluster up/down by adding/removing machines, and relaunching the 'suspended' (shut down) jobs. I suspect that relaunching the jobs may be tricky since that means keeping track of the starter offsets in Kafka topic(s) from which the jobs started working on. Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching of jobs, coupled with the wait for the new machines to come online may turn out quite time-consuming which will make for lengthy request times, and our requests are not asynchronous. Ideally, the currently running jobs would continue to run on the machines currently available in the cluster. In the scale-down case, the job manager would want to signal to Spark's job scheduler not to send work to the node being taken out, find out when the last job has finished running on the node, then take the node out. This is somewhat like changing the number of cylinders in a car engine while the car is running... Sounds like a great candidate for a set of enhancements in Spark... On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov wrote: @DG; The key metrics should be - Scheduling delay – its ideal state is to remain constant over time and ideally be less than the time of the microbatch window - The average job processing time should remain less than the micro-batch window - Number of Lost Jobs – even if there is a single Job lost that means that you have lost all messages for the DStream RDD processed by that job due to the previously described spark streaming memory leak condition an
Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?
Evo, good points. On the dynamic resource allocation, I'm surmising this only works within a particular cluster setup. So it improves the usage of current cluster resources but it doesn't make the cluster itself elastic. At least, that's my understanding. Memory + disk would be good and hopefully it'd take *huge* load on the system to start exhausting the disk space too. I'd guess that falling onto disk will make things significantly slower due to the extra I/O. Perhaps we'll really want all of these elements eventually. I think we'd want to start with memory only, keeping maxRate low enough not to overwhelm the consumers; implement the cluster autoscaling. We might experiment with dynamic resource allocation before we get to implement the cluster autoscale. On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov wrote: > You can also try Dynamic Resource Allocation > > > > > https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation > > > > Also re the Feedback Loop for automatic message consumption rate > adjustment – there is a “dumb” solution option – simply set the storage > policy for the DStream RDDs to MEMORY AND DISK – when the memory gets > exhausted spark streaming will resort to keeping new RDDs on disk which > will prevent it from crashing and hence loosing them. Then some memory will > get freed and it will resort back to RAM and so on and so forth > > > > > > Sent from Samsung Mobile > > Original message > > From: Evo Eftimov > > Date:2015/05/28 13:22 (GMT+00:00) > > To: Dmitry Goldenberg > > Cc: Gerard Maas ,spark users > > Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth > in Kafka or Spark's metrics? > > > > You can always spin new boxes in the background and bring them into the > cluster fold when fully operational and time that with job relaunch and > param change > > > > Kafka offsets are mabaged automatically for you by the kafka clients which > keep them in zoomeeper dont worry about that ad long as you shut down your > job gracefuly. Besides msnaging the offsets explicitly is not a big deal if > necessary > > > > > > Sent from Samsung Mobile > > > > Original message > > From: Dmitry Goldenberg > > Date:2015/05/28 13:16 (GMT+00:00) > > To: Evo Eftimov > > Cc: Gerard Maas ,spark users > > Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth > in Kafka or Spark's metrics? > > > > Thanks, Evo. Per the last part of your comment, it sounds like we will > need to implement a job manager which will be in control of starting the > jobs, monitoring the status of the Kafka topic(s), shutting jobs down and > marking them as ones to relaunch, scaling the cluster up/down by > adding/removing machines, and relaunching the 'suspended' (shut down) jobs. > > > > I suspect that relaunching the jobs may be tricky since that means keeping > track of the starter offsets in Kafka topic(s) from which the jobs started > working on. > > > > Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching > of jobs, coupled with the wait for the new machines to come online may turn > out quite time-consuming which will make for lengthy request times, and our > requests are not asynchronous. Ideally, the currently running jobs would > continue to run on the machines currently available in the cluster. > > > > In the scale-down case, the job manager would want to signal to Spark's > job scheduler not to send work to the node being taken out, find out when > the last job has finished running on the node, then take the node out. > > > > This is somewhat like changing the number of cylinders in a car engine > while the car is running... > > > > Sounds like a great candidate for a set of enhancements in Spark... > > > > On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov > wrote: > > @DG; The key metrics should be > > > > - Scheduling delay – its ideal state is to remain constant over > time and ideally be less than the time of the microbatch window > > - The average job processing time should remain less than the > micro-batch window > > - Number of Lost Jobs – even if there is a single Job lost that > means that you have lost all messages for the DStream RDD processed by that > job due to the previously described spark streaming memory leak condition > and subsequent crash – described in previous postings submitted by me > > > > You can even go one step further and periodically issue “get/check free > memory” to see whether it is decreasing relentlessly at a constant rate – > if it touches a predetermined RAM threshold that should be your third > metric > > > > Re the “back pressure” mechanism – this is a Feedback Loop mechanism and > you can implement one on your own without waiting for Jiras and new > features whenever they might be implemented by the Spark dev team – > moreover you can avoid using slow mechanisms such as ZooKeeper and even > incorporate some Machine Lea