If I want to restart my consumers into an updated cluster topology after
the cluster has been expanded or contracted, would I need to call stop() on
them, then call start() on them, or would I need to instantiate and start
new context objects (new JavaStreamingContext(...)) ?  I'm thinking of
actually acquiescing these streaming consumers but letting them finish
their current batch first.

Right now I'm doing

    jssc.start();
    jssc.awaitTermination();

Must jssc.close() be called as well, after awaitTermination(), to avoid
potentially leaking contexts?  I don't see that in things
like JavaDirectKafkaWordCount but wondering if that's needed.

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov <evo.efti...@isecc.com> wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov <evo.efti...@isecc.com>
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or <and...@databricks.com> wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
>
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
>
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
>
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
>
>
> -Andrew
>
>
>
>
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov <evo.efti...@isecc.com>:
>
>
>
> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
> – it will be your insurance policy against sys crashes due to memory leaks.
> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
> and of course resorting to disk from time to time (ie when there is no free
> RAM ) and taking a performance hit from that, BUT only until there is no
> free RAM
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Thursday, May 28, 2015 2:34 PM
> *To:* Evo Eftimov
> *Cc:* Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo, good points.
>
>
>
> On the dynamic resource allocation, I'm surmising this only works within a
> particular cluster setup.  So it improves the usage of current cluster
> resources but it doesn't make the cluster itself elastic. At least, that's
> my understanding.
>
>
>
> Memory + disk would be good and hopefully it'd take *huge* load on the
> system to start exhausting the disk space too.  I'd guess that falling onto
> disk will make things significantly slower due to the extra I/O.
>
>
>
> Perhaps we'll really want all of these elements eventually.  I think we'd
> want to start with memory only, keeping maxRate low enough not to overwhelm
> the consumers; implement the cluster autoscaling.  We might experiment with
> dynamic resource allocation before we get to implement the cluster
> autoscale.
>
>
>
>
>
>
>
> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov <evo.efti...@isecc.com>
> wrote:
>
> You can also try Dynamic Resource Allocation
>
>
>
>
> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Also re the Feedback Loop for automatic message consumption rate
> adjustment – there is a “dumb” solution option – simply set the storage
> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
> exhausted spark streaming will resort to keeping new RDDs on disk which
> will prevent it from crashing and hence loosing them. Then some memory will
> get freed and it will resort back to RAM and so on and so forth
>
>
>
>
>
> Sent from Samsung Mobile
>
> -------- Original message --------
>
> From: Evo Eftimov
>
> Date:2015/05/28 13:22 (GMT+00:00)
>
> To: Dmitry Goldenberg
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> You can always spin new boxes in the background and bring them into the
> cluster fold when fully operational and time that with job relaunch and
> param change
>
>
>
> Kafka offsets are mabaged automatically for you by the kafka clients which
> keep them in zoomeeper dont worry about that ad long as you shut down your
> job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
> necessary
>
>
>
>
>
> Sent from Samsung Mobile
>
>
>
> -------- Original message --------
>
> From: Dmitry Goldenberg
>
> Date:2015/05/28 13:16 (GMT+00:00)
>
> To: Evo Eftimov
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> Thanks, Evo.  Per the last part of your comment, it sounds like we will
> need to implement a job manager which will be in control of starting the
> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
> marking them as ones to relaunch, scaling the cluster up/down by
> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>
>
>
> I suspect that relaunching the jobs may be tricky since that means keeping
> track of the starter offsets in Kafka topic(s) from which the jobs started
> working on.
>
>
>
> Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
> of jobs, coupled with the wait for the new machines to come online may turn
> out quite time-consuming which will make for lengthy request times, and our
> requests are not asynchronous.  Ideally, the currently running jobs would
> continue to run on the machines currently available in the cluster.
>
>
>
> In the scale-down case, the job manager would want to signal to Spark's
> job scheduler not to send work to the node being taken out, find out when
> the last job has finished running on the node, then take the node out.
>
>
>
> This is somewhat like changing the number of cylinders in a car engine
> while the car is running...
>
>
>
> Sounds like a great candidate for a set of enhancements in Spark...
>
>
>
> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov <evo.efti...@isecc.com>
> wrote:
>
> @DG; The key metrics should be
>
>
>
> -          Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -          The average job processing time should remain less than the
> micro-batch window
>
> -          Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to see whether it is decreasing relentlessly at a constant rate –
> if it touches a predetermined RAM threshold that should be your third
> metric
>
>
>
> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
> you can implement one on your own without waiting for Jiras and new
> features whenever they might be implemented by the Spark dev team –
> moreover you can avoid using slow mechanisms such as ZooKeeper and even
> incorporate some Machine Learning in your Feedback Loop to make it handle
> the message consumption rate more intelligently and benefit from ongoing
> online learning – BUT this is STILL about voluntarily sacrificing your
> performance in the name of keeping your system stable – it is not about
> scaling your system/solution
>
>
>
> In terms of how to scale the Spark Framework Dynamically – even though
> this is not supported at the moment out of the box I guess you can have a
> sys management framework spin dynamically a few more boxes (spark worker
> nodes), stop dynamically your currently running Spark Streaming Job,
> relaunch it with new params e.g. more Receivers, larger number of
> Partitions (hence tasks), more RAM per executor etc. Obviously this will
> cause some temporary delay in fact interruption in your processing but if
> the business use case can tolerate that then go for it
>
>
>
> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
> *Sent:* Thursday, May 28, 2015 12:36 PM
> *To:* dgoldenberg
> *Cc:* spark users
> *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of
> growth in Kafka or Spark's metrics?
>
>
>
> Hi,
>
>
>
> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
> streaming processes is not supported.
>
>
>
>
>
> *Longer version.*
>
>
>
> I assume that you are talking about Spark Streaming as the discussion is
> about handing Kafka streaming data.
>
>
>
> Then you have two things to consider: the Streaming receivers and the
> Spark processing cluster.
>
>
>
> Currently, the receiving topology is static. One receiver is allocated
> with each DStream instantiated and it will use 1 core in the cluster. Once
> the StreamingContext is started, this topology cannot be changed, therefore
> the number of Kafka receivers is fixed for the lifetime of your DStream.
>
> What we do is to calculate the cluster capacity and use that as a fixed
> upper bound (with a margin) for the receiver throughput.
>
>
>
> There's work in progress to add a reactive model to the receiver, where
> backpressure can be applied to handle overload conditions. See
> https://issues.apache.org/jira/browse/SPARK-7398
>
>
>
> Once the data is received, it will be processed in a 'classical' Spark
> pipeline, so previous posts on spark resource scheduling might apply.
>
>
>
> Regarding metrics, the standard metrics subsystem of spark will report
> streaming job performance. Check the driver's metrics endpoint to peruse
> the available metrics:
>
>
>
> <driver>:<ui-port>/metrics/json
>
>
>
> -kr, Gerard.
>
>
>
>
>
> (*) Spark is a project that moves so fast that statements might be
> invalidated by new work every minute.
>
>
>
> On Thu, May 28, 2015 at 1:21 AM, dgoldenberg <dgoldenberg...@gmail.com>
> wrote:
>
> Hi,
>
> I'm trying to understand if there are design patterns for autoscaling Spark
> (add/remove slave machines to the cluster) based on the throughput.
>
> Assuming we can throttle Spark consumers, the respective Kafka topics we
> stream data from would start growing.  What are some of the ways to
> generate
> the metrics on the number of new messages and the rate they are piling up?
> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
> with
> the Metric interface and not much else...
>
> What are some of the ways to expand/contract the Spark cluster? Someone has
> mentioned Mesos...
>
> I see some info on Spark metrics in  the Spark monitoring guide
> <https://spark.apache.org/docs/latest/monitoring.html>  .  Do we want to
> perhaps implement a custom sink that would help us autoscale up or down
> based on the throughput?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to