Great. "You should monitor vital performance / job clogging stats of the Spark Streaming Runtime not “kafka topics” -- anything specific you were thinking of?
On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov <evo.efti...@isecc.com> wrote: > Makes sense especially if you have a cloud with “infinite” resources / > nodes which allows you to double, triple etc in the background/parallel the > resources of the currently running cluster > > > > I was thinking more about the scenario where you have e.g. 100 boxes and > want to / can add e.g. 20 more > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:46 PM > *To:* Evo Eftimov > *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Evo, > > > > One of the ideas is to shadow the current cluster. This way there's no > extra latency incurred due to shutting down of the consumers. If two sets > of consumers are running, potentially processing the same data, that is OK. > We phase out the older cluster and gradually flip over to the new one, > insuring no downtime or extra latency. Thoughts? > > > > On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov <evo.efti...@isecc.com> > wrote: > > You should monitor vital performance / job clogging stats of the Spark > Streaming Runtime not “kafka topics” > > > > You should be able to bring new worker nodes online and make them contact > and register with the Master without bringing down the Master (or any of > the currently running worker nodes) > > > > Then just shutdown your currently running spark streaming job/app and > restart it with new params to take advantage of the larger cluster > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Wednesday, June 3, 2015 4:14 PM > *To:* Cody Koeninger > *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Would it be possible to implement Spark autoscaling somewhat along these > lines? -- > > > > 1. If we sense that a new machine is needed, by watching the data load in > Kafka topic(s), then > > 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS > and get a machine); > > 3. Create a "shadow"/mirror Spark master running alongside the initial > version which talks to N machines. The new mirror version is aware of N+1 > machines (or N+M if we had decided we needed M new boxes). > > 4. The previous version of the Spark runtime is > acquiesced/decommissioned. We possibly get both clusters working on the > same data which may actually be OK (at least for our specific use-cases). > > 5. Now the new Spark cluster is running. > > > > Similarly, the decommissioning of M unused boxes would happen, via this > notion of a mirror Spark runtime. How feasible would it be for such a > mirrorlike setup to be created, especially created programmatically? > Especially point #3. > > > > The other idea we'd entertained was to bring in a new machine, acquiesce > down all currently running workers by telling them to process their current > batch then shut down, then restart the consumers now that Spark is aware of > a modified cluster. This has the drawback of a downtime that may not be > tolerable in terms of latency, by the system's clients waiting for their > responses in a synchronous fashion. > > > > Thanks. > > > > On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger <c...@koeninger.org> > wrote: > > I'm not sure that points 1 and 2 really apply to the kafka direct stream. > There are no receivers, and you know at the driver how big each of your > batches is. > > > > On Thu, May 28, 2015 at 2:21 PM, Andrew Or <and...@databricks.com> wrote: > > Hi all, > > > > As the author of the dynamic allocation feature I can offer a few insights > here. > > > > Gerard's explanation was both correct and concise: dynamic allocation is > not intended to be used in Spark streaming at the moment (1.4 or before). > This is because of two things: > > > > (1) Number of receivers is necessarily fixed, and these are started in > executors. Since we need a receiver for each InputDStream, if we kill these > receivers we essentially stop the stream, which is not what we want. It > makes little sense to close and restart a stream the same way we kill and > relaunch executors. > > > > (2) Records come in every batch, and when there is data to process your > executors are not idle. If your idle timeout is less than the batch > duration, then you'll end up having to constantly kill and restart > executors. If your idle timeout is greater than the batch duration, then > you'll never kill executors. > > > > Long answer short, with Spark streaming there is currently no > straightforward way to scale the size of your cluster. I had a long > discussion with TD (Spark streaming lead) about what needs to be done to > provide some semblance of dynamic scaling to streaming applications, e.g. > take into account the batch queue instead. We came up with a few ideas that > I will not detail here, but we are looking into this and do intend to > support it in the near future. > > > > -Andrew > > > > > > > > 2015-05-28 8:02 GMT-07:00 Evo Eftimov <evo.efti...@isecc.com>: > > > > Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK > – it will be your insurance policy against sys crashes due to memory leaks. > Until there is free RAM, spark streaming (spark) will NOT resort to disk – > and of course resorting to disk from time to time (ie when there is no free > RAM ) and taking a performance hit from that, BUT only until there is no > free RAM > > > > *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] > *Sent:* Thursday, May 28, 2015 2:34 PM > *To:* Evo Eftimov > *Cc:* Gerard Maas; spark users > *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic > sizes/rate of growth in Kafka or Spark's metrics? > > > > Evo, good points. > > > > On the dynamic resource allocation, I'm surmising this only works within a > particular cluster setup. So it improves the usage of current cluster > resources but it doesn't make the cluster itself elastic. At least, that's > my understanding. > > > > Memory + disk would be good and hopefully it'd take *huge* load on the > system to start exhausting the disk space too. I'd guess that falling onto > disk will make things significantly slower due to the extra I/O. > > > > Perhaps we'll really want all of these elements eventually. I think we'd > want to start with memory only, keeping maxRate low enough not to overwhelm > the consumers; implement the cluster autoscaling. We might experiment with > dynamic resource allocation before we get to implement the cluster > autoscale. > > > > > > > > On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov <evo.efti...@isecc.com> > wrote: > > You can also try Dynamic Resource Allocation > > > > > https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation > > > > Also re the Feedback Loop for automatic message consumption rate > adjustment – there is a “dumb” solution option – simply set the storage > policy for the DStream RDDs to MEMORY AND DISK – when the memory gets > exhausted spark streaming will resort to keeping new RDDs on disk which > will prevent it from crashing and hence loosing them. Then some memory will > get freed and it will resort back to RAM and so on and so forth > > > > > > Sent from Samsung Mobile > > -------- Original message -------- > > From: Evo Eftimov > > Date:2015/05/28 13:22 (GMT+00:00) > > To: Dmitry Goldenberg > > Cc: Gerard Maas ,spark users > > Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth > in Kafka or Spark's metrics? > > > > You can always spin new boxes in the background and bring them into the > cluster fold when fully operational and time that with job relaunch and > param change > > > > Kafka offsets are mabaged automatically for you by the kafka clients which > keep them in zoomeeper dont worry about that ad long as you shut down your > job gracefuly. Besides msnaging the offsets explicitly is not a big deal if > necessary > > > > > > Sent from Samsung Mobile > > > > -------- Original message -------- > > From: Dmitry Goldenberg > > Date:2015/05/28 13:16 (GMT+00:00) > > To: Evo Eftimov > > Cc: Gerard Maas ,spark users > > Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth > in Kafka or Spark's metrics? > > > > Thanks, Evo. Per the last part of your comment, it sounds like we will > need to implement a job manager which will be in control of starting the > jobs, monitoring the status of the Kafka topic(s), shutting jobs down and > marking them as ones to relaunch, scaling the cluster up/down by > adding/removing machines, and relaunching the 'suspended' (shut down) jobs. > > > > I suspect that relaunching the jobs may be tricky since that means keeping > track of the starter offsets in Kafka topic(s) from which the jobs started > working on. > > > > Ideally, we'd want to avoid a re-launch. The 'suspension' and relaunching > of jobs, coupled with the wait for the new machines to come online may turn > out quite time-consuming which will make for lengthy request times, and our > requests are not asynchronous. Ideally, the currently running jobs would > continue to run on the machines currently available in the cluster. > > > > In the scale-down case, the job manager would want to signal to Spark's > job scheduler not to send work to the node being taken out, find out when > the last job has finished running on the node, then take the node out. > > > > This is somewhat like changing the number of cylinders in a car engine > while the car is running... > > > > Sounds like a great candidate for a set of enhancements in Spark... > > > > On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov <evo.efti...@isecc.com> > wrote: > > @DG; The key metrics should be > > > > - Scheduling delay – its ideal state is to remain constant over > time and ideally be less than the time of the microbatch window > > - The average job processing time should remain less than the > micro-batch window > > - Number of Lost Jobs – even if there is a single Job lost that > means that you have lost all messages for the DStream RDD processed by that > job due to the previously described spark streaming memory leak condition > and subsequent crash – described in previous postings submitted by me > > > > You can even go one step further and periodically issue “get/check free > memory” to see whether it is decreasing relentlessly at a constant rate – > if it touches a predetermined RAM threshold that should be your third > metric > > > > Re the “back pressure” mechanism – this is a Feedback Loop mechanism and > you can implement one on your own without waiting for Jiras and new > features whenever they might be implemented by the Spark dev team – > moreover you can avoid using slow mechanisms such as ZooKeeper and even > incorporate some Machine Learning in your Feedback Loop to make it handle > the message consumption rate more intelligently and benefit from ongoing > online learning – BUT this is STILL about voluntarily sacrificing your > performance in the name of keeping your system stable – it is not about > scaling your system/solution > > > > In terms of how to scale the Spark Framework Dynamically – even though > this is not supported at the moment out of the box I guess you can have a > sys management framework spin dynamically a few more boxes (spark worker > nodes), stop dynamically your currently running Spark Streaming Job, > relaunch it with new params e.g. more Receivers, larger number of > Partitions (hence tasks), more RAM per executor etc. Obviously this will > cause some temporary delay in fact interruption in your processing but if > the business use case can tolerate that then go for it > > > > *From:* Gerard Maas [mailto:gerard.m...@gmail.com] > *Sent:* Thursday, May 28, 2015 12:36 PM > *To:* dgoldenberg > *Cc:* spark users > *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of > growth in Kafka or Spark's metrics? > > > > Hi, > > > > tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark > streaming processes is not supported. > > > > > > *Longer version.* > > > > I assume that you are talking about Spark Streaming as the discussion is > about handing Kafka streaming data. > > > > Then you have two things to consider: the Streaming receivers and the > Spark processing cluster. > > > > Currently, the receiving topology is static. One receiver is allocated > with each DStream instantiated and it will use 1 core in the cluster. Once > the StreamingContext is started, this topology cannot be changed, therefore > the number of Kafka receivers is fixed for the lifetime of your DStream. > > What we do is to calculate the cluster capacity and use that as a fixed > upper bound (with a margin) for the receiver throughput. > > > > There's work in progress to add a reactive model to the receiver, where > backpressure can be applied to handle overload conditions. See > https://issues.apache.org/jira/browse/SPARK-7398 > > > > Once the data is received, it will be processed in a 'classical' Spark > pipeline, so previous posts on spark resource scheduling might apply. > > > > Regarding metrics, the standard metrics subsystem of spark will report > streaming job performance. Check the driver's metrics endpoint to peruse > the available metrics: > > > > <driver>:<ui-port>/metrics/json > > > > -kr, Gerard. > > > > > > (*) Spark is a project that moves so fast that statements might be > invalidated by new work every minute. > > > > On Thu, May 28, 2015 at 1:21 AM, dgoldenberg <dgoldenberg...@gmail.com> > wrote: > > Hi, > > I'm trying to understand if there are design patterns for autoscaling Spark > (add/remove slave machines to the cluster) based on the throughput. > > Assuming we can throttle Spark consumers, the respective Kafka topics we > stream data from would start growing. What are some of the ways to > generate > the metrics on the number of new messages and the rate they are piling up? > This perhaps is more of a Kafka question; I see a pretty sparse javadoc > with > the Metric interface and not much else... > > What are some of the ways to expand/contract the Spark cluster? Someone has > mentioned Mesos... > > I see some info on Spark metrics in the Spark monitoring guide > <https://spark.apache.org/docs/latest/monitoring.html> . Do we want to > perhaps implement a custom sink that would help us autoscale up or down > based on the throughput? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > > > > > > > > > > > >