Would it be possible to implement Spark autoscaling somewhat along these lines? --
1. If we sense that a new machine is needed, by watching the data load in Kafka topic(s), then 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and get a machine); 3. Create a "shadow"/mirror Spark master running alongside the initial version which talks to N machines. The new mirror version is aware of N+1 machines (or N+M if we had decided we needed M new boxes). 4. The previous version of the Spark runtime is acquiesced/decommissioned. We possibly get both clusters working on the same data which may actually be OK (at least for our specific use-cases). 5. Now the new Spark cluster is running. Similarly, the decommissioning of M unused boxes would happen, via this notion of a mirror Spark runtime. How feasible would it be for such a mirrorlike setup to be created, especially created programmatically? Especially point #3. The other idea we'd entertained was to bring in a new machine, acquiesce down all currently running workers by telling them to process their current batch then shut down, then restart the consumers now that Spark is aware of a modified cluster. This has the drawback of a downtime that may not be tolerable in terms of latency, by the system's clients waiting for their responses in a synchronous fashion. Thanks. On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger <c...@koeninger.org> wrote: > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or <and...@databricks.com> wrote: > >> Hi all, >> >> As the author of the dynamic allocation feature I can offer a few >> insights here. >> >> Gerard's explanation was both correct and concise: dynamic allocation is >> not intended to be used in Spark streaming at the moment (1.4 or before). >> This is because of two things: >> >> (1) Number of receivers is necessarily fixed, and these are started in >> executors. Since we need a receiver for each InputDStream, if we kill these >> receivers we essentially stop the stream, which is not what we want. It >> makes little sense to close and restart a stream the same way we kill and >> relaunch executors. >> >> (2) Records come in every batch, and when there is data to process your >> executors are not idle. If your idle timeout is less than the batch >> duration, then you'll end up having to constantly kill and restart >> executors. If your idle timeout is greater than the batch duration, then >> you'll never kill executors. >> >> Long answer short, with Spark streaming there is currently no >> straightforward way to scale the size of your cluster. I had a long >> discussion with TD (Spark streaming lead) about what needs to be done to >> provide some semblance of dynamic scaling to streaming applications, e.g. >> take into account the batch queue instead. We came up with a few ideas that >> I will not detail here, but we are looking into this and do intend to >> support it in the near future. >> >> -Andrew >> >> >> >> 2015-05-28 8:02 GMT-07:00 Evo Eftimov <evo.efti...@isecc.com>: >> >> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK >>> – it will be your insurance policy against sys crashes due to memory leaks. >>> Until there is free RAM, spark streaming (spark) will NOT resort to disk – >>> and of course resorting to disk from time to time (ie when there is no free >>> RAM ) and taking a performance hit from that, BUT only until there is no >>> free RAM >>> >>> >>> >>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] >>> *Sent:* Thursday, May 28, 2015 2:34 PM >>> *To:* Evo Eftimov >>> *Cc:* Gerard Maas; spark users >>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic >>> sizes/rate of growth in Kafka or Spark's metrics? >>> >>> >>> >>> Evo, good points. >>> >>> >>> >>> On the dynamic resource allocation, I'm surmising this only works within >>> a particular cluster setup. So it improves the usage of current cluster >>> resources but it doesn't make the cluster itself elastic. At least, that's >>> my understanding. >>> >>> >>> >>> Memory + disk would be good and hopefully it'd take *huge* load on the >>> system to start exhausting the disk space too. I'd guess that falling onto >>> disk will make things significantly slower due to the extra I/O. >>> >>> >>> >>> Perhaps we'll really want all of these elements eventually. I think >>> we'd want to start with memory only, keeping maxRate low enough not to >>> overwhelm the consumers; implement the cluster autoscaling. We might >>> experiment with dynamic resource allocation before we get to implement the >>> cluster autoscale. >>> >>> >>> >>> >>> >>> >>> >>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov <evo.efti...@isecc.com> >>> wrote: >>> >>> You can also try Dynamic Resource Allocation >>> >>> >>> >>> >>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation >>> >>> >>> >>> Also re the Feedback Loop for automatic message consumption rate >>> adjustment – there is a “dumb” solution option – simply set the storage >>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets >>> exhausted spark streaming will resort to keeping new RDDs on disk which >>> will prevent it from crashing and hence loosing them. Then some memory will >>> get freed and it will resort back to RAM and so on and so forth >>> >>> >>> >>> >>> >>> Sent from Samsung Mobile >>> >>> -------- Original message -------- >>> >>> From: Evo Eftimov >>> >>> Date:2015/05/28 13:22 (GMT+00:00) >>> >>> To: Dmitry Goldenberg >>> >>> Cc: Gerard Maas ,spark users >>> >>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >>> growth in Kafka or Spark's metrics? >>> >>> >>> >>> You can always spin new boxes in the background and bring them into the >>> cluster fold when fully operational and time that with job relaunch and >>> param change >>> >>> >>> >>> Kafka offsets are mabaged automatically for you by the kafka clients >>> which keep them in zoomeeper dont worry about that ad long as you shut down >>> your job gracefuly. Besides msnaging the offsets explicitly is not a big >>> deal if necessary >>> >>> >>> >>> >>> >>> Sent from Samsung Mobile >>> >>> >>> >>> -------- Original message -------- >>> >>> From: Dmitry Goldenberg >>> >>> Date:2015/05/28 13:16 (GMT+00:00) >>> >>> To: Evo Eftimov >>> >>> Cc: Gerard Maas ,spark users >>> >>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of >>> growth in Kafka or Spark's metrics? >>> >>> >>> >>> Thanks, Evo. Per the last part of your comment, it sounds like we will >>> need to implement a job manager which will be in control of starting the >>> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and >>> marking them as ones to relaunch, scaling the cluster up/down by >>> adding/removing machines, and relaunching the 'suspended' (shut down) jobs. >>> >>> >>> >>> I suspect that relaunching the jobs may be tricky since that means >>> keeping track of the starter offsets in Kafka topic(s) from which the jobs >>> started working on. >>> >>> >>> >>> Ideally, we'd want to avoid a re-launch. The 'suspension' and >>> relaunching of jobs, coupled with the wait for the new machines to come >>> online may turn out quite time-consuming which will make for lengthy >>> request times, and our requests are not asynchronous. Ideally, the >>> currently running jobs would continue to run on the machines currently >>> available in the cluster. >>> >>> >>> >>> In the scale-down case, the job manager would want to signal to Spark's >>> job scheduler not to send work to the node being taken out, find out when >>> the last job has finished running on the node, then take the node out. >>> >>> >>> >>> This is somewhat like changing the number of cylinders in a car engine >>> while the car is running... >>> >>> >>> >>> Sounds like a great candidate for a set of enhancements in Spark... >>> >>> >>> >>> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov <evo.efti...@isecc.com> >>> wrote: >>> >>> @DG; The key metrics should be >>> >>> >>> >>> - Scheduling delay – its ideal state is to remain constant >>> over time and ideally be less than the time of the microbatch window >>> >>> - The average job processing time should remain less than the >>> micro-batch window >>> >>> - Number of Lost Jobs – even if there is a single Job lost >>> that means that you have lost all messages for the DStream RDD processed by >>> that job due to the previously described spark streaming memory leak >>> condition and subsequent crash – described in previous postings submitted >>> by me >>> >>> >>> >>> You can even go one step further and periodically issue “get/check free >>> memory” to see whether it is decreasing relentlessly at a constant rate – >>> if it touches a predetermined RAM threshold that should be your third >>> metric >>> >>> >>> >>> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and >>> you can implement one on your own without waiting for Jiras and new >>> features whenever they might be implemented by the Spark dev team – >>> moreover you can avoid using slow mechanisms such as ZooKeeper and even >>> incorporate some Machine Learning in your Feedback Loop to make it handle >>> the message consumption rate more intelligently and benefit from ongoing >>> online learning – BUT this is STILL about voluntarily sacrificing your >>> performance in the name of keeping your system stable – it is not about >>> scaling your system/solution >>> >>> >>> >>> In terms of how to scale the Spark Framework Dynamically – even though >>> this is not supported at the moment out of the box I guess you can have a >>> sys management framework spin dynamically a few more boxes (spark worker >>> nodes), stop dynamically your currently running Spark Streaming Job, >>> relaunch it with new params e.g. more Receivers, larger number of >>> Partitions (hence tasks), more RAM per executor etc. Obviously this will >>> cause some temporary delay in fact interruption in your processing but if >>> the business use case can tolerate that then go for it >>> >>> >>> >>> *From:* Gerard Maas [mailto:gerard.m...@gmail.com] >>> *Sent:* Thursday, May 28, 2015 12:36 PM >>> *To:* dgoldenberg >>> *Cc:* spark users >>> *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of >>> growth in Kafka or Spark's metrics? >>> >>> >>> >>> Hi, >>> >>> >>> >>> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark >>> streaming processes is not supported. >>> >>> >>> >>> >>> >>> *Longer version.* >>> >>> >>> >>> I assume that you are talking about Spark Streaming as the discussion is >>> about handing Kafka streaming data. >>> >>> >>> >>> Then you have two things to consider: the Streaming receivers and the >>> Spark processing cluster. >>> >>> >>> >>> Currently, the receiving topology is static. One receiver is allocated >>> with each DStream instantiated and it will use 1 core in the cluster. Once >>> the StreamingContext is started, this topology cannot be changed, therefore >>> the number of Kafka receivers is fixed for the lifetime of your DStream. >>> >>> What we do is to calculate the cluster capacity and use that as a fixed >>> upper bound (with a margin) for the receiver throughput. >>> >>> >>> >>> There's work in progress to add a reactive model to the receiver, where >>> backpressure can be applied to handle overload conditions. See >>> https://issues.apache.org/jira/browse/SPARK-7398 >>> >>> >>> >>> Once the data is received, it will be processed in a 'classical' Spark >>> pipeline, so previous posts on spark resource scheduling might apply. >>> >>> >>> >>> Regarding metrics, the standard metrics subsystem of spark will report >>> streaming job performance. Check the driver's metrics endpoint to peruse >>> the available metrics: >>> >>> >>> >>> <driver>:<ui-port>/metrics/json >>> >>> >>> >>> -kr, Gerard. >>> >>> >>> >>> >>> >>> (*) Spark is a project that moves so fast that statements might be >>> invalidated by new work every minute. >>> >>> >>> >>> On Thu, May 28, 2015 at 1:21 AM, dgoldenberg <dgoldenberg...@gmail.com> >>> wrote: >>> >>> Hi, >>> >>> I'm trying to understand if there are design patterns for autoscaling >>> Spark >>> (add/remove slave machines to the cluster) based on the throughput. >>> >>> Assuming we can throttle Spark consumers, the respective Kafka topics we >>> stream data from would start growing. What are some of the ways to >>> generate >>> the metrics on the number of new messages and the rate they are piling >>> up? >>> This perhaps is more of a Kafka question; I see a pretty sparse javadoc >>> with >>> the Metric interface and not much else... >>> >>> What are some of the ways to expand/contract the Spark cluster? Someone >>> has >>> mentioned Mesos... >>> >>> I see some info on Spark metrics in the Spark monitoring guide >>> <https://spark.apache.org/docs/latest/monitoring.html> . Do we want to >>> perhaps implement a custom sink that would help us autoscale up or down >>> based on the throughput? >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> >>> >>> >>> >>> >> >> >