Yes, Tathagata, thank you.

For #1, the 'need detection', one idea we're entertaining is timestamping
the messages coming into the Kafka topics. The consumers would check the
interval between the time they get the message and that message origination
timestamp. As Kafka topics start to fill up more, we would presumably see
longer and longer wait times (delays) for messages to be getting processed
by the consumers.  The consumers would then start firing off critical
events into an Event Analyzer/Aggregator which would decide that more
resources are needed, then ask the Provisioning Component to allocate N new
machines.

We do want to set maxRatePerPartition in order to not overwhelm the
consumers and run out of memory.  Machine provisioning may take a while,
and if left with no maxRate guards, our consumers could run out of memory.

"Since there are no receivers, if the cluster gets a new executor, it will
automatically start getting used to run tasks... no need to do anything
further."  This is great, actually. We were wondering whether we'd need to
restart the consumers once the new machines have been added. Tathagata's
point implies, as I read it, that no further orchestration is needed, the
load will start getting redistributed automatically. This makes
implementation of autoscaling a lot simpler, as far as #3.

One issue that's not yet been covered much is the scenario when *fewer*
cluster resources become required (a system load valley rather than a
peak). To detect a low volume, we'd need to measure the throughput in
messages per second over time.  Real low volumes would cause firing off of
critical events signaling to the Analyzer that machines could be
decommissioned.

If machines are being decommissioned, it would seem that the consumers
would need to get acquiesced (allowed to process any current batch, then
shut down), then they would restart themselves or be restarted. Thoughts on
this?

There is also a hefty #4 here which is the "hysteresis" of this, where the
system operates adaptively and learns over time, remembering the history of
cluster expansions and contractions and allowing a certain slack for
letting things cool down or heat up more gradually; also not contracting or
expanding too frequently.  PID controllers  and thermostat types of design
patterns have been mentioned before in this discussion.



On Thu, Jun 11, 2015 at 11:08 PM, Tathagata Das <t...@databricks.com> wrote:

> Let me try to add some clarity in the different thought directions that's
> going on in this thread.
>
> 1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES?
>
> If there are not rate limits set up, the most reliable way to detect
> whether the current Spark cluster is being insufficient to handle the data
> load is to use the StreamingListner interface which gives all the
> information about when batches start and end. See the internal
> implementation of the StreamingListener called
> StreamingJobProgressListener. This is the one that drives the streaming UI.
> You can get the scheduling delay (time take for a batch to start
> processing) from it and use that as a reliable indicator that Spark
> Streaming is not able to process as fast as data is being received.
>
> But if you have already set rate limits based on the max load that cluster
> can handle, then you will probably never detect that the actual input rate
> into Kafka has gone up and data is getting buffered inside Kafka. In that
> case, you have to monitor kafka load to correctly detect the high load. You
> may to use a combination of both techniques for robust and safe elastic
> solution -  Have rate limits set, use StreamingListener for early detect
> that processing load is increasing (can increase without actual increase in
> data rate) and also make sure from Kafka monitoring that the whole
> end-to-end system is keeping up.
>
>
> 2. HOW TO GET MORE CLUSTER RESOURCES?
>
> Currently for YARN, you can use the developer API of dynamic allocation
> that Andrew Or has introduced to ask for more executors from YARN. Note
> that the existing dynamic allocation solution is unlikely to work for
> streaming, and should not be used. Rather I recommend building your own
> logic that sees the streaming scheduling delay, and accordingly uses the
> low level developer API to directly ask for more executors
> (sparkContext.requestExecutors). In other approaches, the Provising
> Component idea can also work.
>
>
> 3. HOW TO TAKE ADVANTAGE OF MORE CLUSTER RESOURCES?
>
> There are two approaches depending on receiver vs Kafka direct. I am
> assuming the number of topic partitions pre-determined to be large enough
> to handle peak load.
>
> (a) Kafka Direct: This is the simpler scenario.  Since there are no
> receivers, if the cluster gets a new executor, it will automatically start
> getting used to run tasks, including reading from Kafka (remember, Kafka
> direct approach reads from Kafka like a file system, from any node that
> runs the task). So it will immediately start using the extra resources, no
> need to do anything further.
>
> (b) Receiver: This is definitely tricky. If you dont need to increase the
> number of receivers, then a new executor will start getting used for
> computations (shuffles, writing out, etc.), but the parallelism in
> receiving will not increase. If you need to increase that, then its best to
> shutdown the context gracefully (so that no data is lost), and a new
> StreamingContext can be started with more receivers (# receivers <= #
> executors), and may be more #partitions for shuffles. You have call stop on
> currently running streaming context, to start a new one. If a context is
> stopped, any thread stuck in awaitTermniation will get unblocked.
>
> Does that clarify things?
>
>
>
>
>
>
>
> On Thu, Jun 11, 2015 at 7:30 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Depends on what you're reusing multiple times (if anything).
>>
>> Read
>> http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
>>
>> On Wed, Jun 10, 2015 at 12:18 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> At which point would I call cache()?  I just want the runtime to spill
>>> to disk when necessary without me having to know when the "necessary" is.
>>>
>>>
>>> On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> direct stream isn't a receiver, it isn't required to cache data
>>>> anywhere unless you want it to.
>>>>
>>>> If you want it, just call cache.
>>>>
>>>> On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
>>>>> appears the storage level can be specified in the createStream methods but
>>>>> not createDirectStream...
>>>>>
>>>>>
>>>>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov <evo.efti...@isecc.com>
>>>>> wrote:
>>>>>
>>>>>> You can also try Dynamic Resource Allocation
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>>>>
>>>>>>
>>>>>>
>>>>>> Also re the Feedback Loop for automatic message consumption rate
>>>>>> adjustment – there is a “dumb” solution option – simply set the storage
>>>>>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>>>>>> exhausted spark streaming will resort to keeping new RDDs on disk which
>>>>>> will prevent it from crashing and hence loosing them. Then some memory 
>>>>>> will
>>>>>> get freed and it will resort back to RAM and so on and so forth
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Sent from Samsung Mobile
>>>>>>
>>>>>> -------- Original message --------
>>>>>>
>>>>>> From: Evo Eftimov
>>>>>>
>>>>>> Date:2015/05/28 13:22 (GMT+00:00)
>>>>>>
>>>>>> To: Dmitry Goldenberg
>>>>>>
>>>>>> Cc: Gerard Maas ,spark users
>>>>>>
>>>>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>>>>> growth in Kafka or Spark's metrics?
>>>>>>
>>>>>>
>>>>>>
>>>>>> You can always spin new boxes in the background and bring them into
>>>>>> the cluster fold when fully operational and time that with job relaunch 
>>>>>> and
>>>>>> param change
>>>>>>
>>>>>>
>>>>>>
>>>>>> Kafka offsets are mabaged automatically for you by the kafka clients
>>>>>> which keep them in zoomeeper dont worry about that ad long as you shut 
>>>>>> down
>>>>>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>>>>>> deal if necessary
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Sent from Samsung Mobile
>>>>>>
>>>>>>
>>>>>>
>>>>>> -------- Original message --------
>>>>>>
>>>>>> From: Dmitry Goldenberg
>>>>>>
>>>>>> Date:2015/05/28 13:16 (GMT+00:00)
>>>>>>
>>>>>> To: Evo Eftimov
>>>>>>
>>>>>> Cc: Gerard Maas ,spark users
>>>>>>
>>>>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>>>>> growth in Kafka or Spark's metrics?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks, Evo.  Per the last part of your comment, it sounds like we
>>>>>> will need to implement a job manager which will be in control of starting
>>>>>> the jobs, monitoring the status of the Kafka topic(s), shutting jobs down
>>>>>> and marking them as ones to relaunch, scaling the cluster up/down by
>>>>>> adding/removing machines, and relaunching the 'suspended' (shut down) 
>>>>>> jobs.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I suspect that relaunching the jobs may be tricky since that means
>>>>>> keeping track of the starter offsets in Kafka topic(s) from which the 
>>>>>> jobs
>>>>>> started working on.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Ideally, we'd want to avoid a re-launch.  The 'suspension' and
>>>>>> relaunching of jobs, coupled with the wait for the new machines to come
>>>>>> online may turn out quite time-consuming which will make for lengthy
>>>>>> request times, and our requests are not asynchronous.  Ideally, the
>>>>>> currently running jobs would continue to run on the machines currently
>>>>>> available in the cluster.
>>>>>>
>>>>>>
>>>>>>
>>>>>> In the scale-down case, the job manager would want to signal to
>>>>>> Spark's job scheduler not to send work to the node being taken out, find
>>>>>> out when the last job has finished running on the node, then take the 
>>>>>> node
>>>>>> out.
>>>>>>
>>>>>>
>>>>>>
>>>>>> This is somewhat like changing the number of cylinders in a car
>>>>>> engine while the car is running...
>>>>>>
>>>>>>
>>>>>>
>>>>>> Sounds like a great candidate for a set of enhancements in Spark...
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov <evo.efti...@isecc.com>
>>>>>> wrote:
>>>>>>
>>>>>> @DG; The key metrics should be
>>>>>>
>>>>>>
>>>>>>
>>>>>> -          Scheduling delay – its ideal state is to remain constant
>>>>>> over time and ideally be less than the time of the microbatch window
>>>>>>
>>>>>> -          The average job processing time should remain less than
>>>>>> the micro-batch window
>>>>>>
>>>>>> -          Number of Lost Jobs – even if there is a single Job lost
>>>>>> that means that you have lost all messages for the DStream RDD processed 
>>>>>> by
>>>>>> that job due to the previously described spark streaming memory leak
>>>>>> condition and subsequent crash – described in previous postings submitted
>>>>>> by me
>>>>>>
>>>>>>
>>>>>>
>>>>>> You can even go one step further and periodically issue “get/check
>>>>>> free memory” to see whether it is decreasing relentlessly at a constant
>>>>>> rate – if it touches a predetermined RAM threshold that should be your
>>>>>> third metric
>>>>>>
>>>>>>
>>>>>>
>>>>>> Re the “back pressure” mechanism – this is a Feedback Loop mechanism
>>>>>> and you can implement one on your own without waiting for Jiras and new
>>>>>> features whenever they might be implemented by the Spark dev team –
>>>>>> moreover you can avoid using slow mechanisms such as ZooKeeper and even
>>>>>> incorporate some Machine Learning in your Feedback Loop to make it handle
>>>>>> the message consumption rate more intelligently and benefit from ongoing
>>>>>> online learning – BUT this is STILL about voluntarily sacrificing your
>>>>>> performance in the name of keeping your system stable – it is not about
>>>>>> scaling your system/solution
>>>>>>
>>>>>>
>>>>>>
>>>>>> In terms of how to scale the Spark Framework Dynamically – even
>>>>>> though this is not supported at the moment out of the box I guess you can
>>>>>> have a sys management framework spin dynamically a few more boxes (spark
>>>>>> worker nodes), stop dynamically your currently running Spark Streaming 
>>>>>> Job,
>>>>>> relaunch it with new params e.g. more Receivers, larger number of
>>>>>> Partitions (hence tasks), more RAM per executor etc. Obviously this will
>>>>>> cause some temporary delay in fact interruption in your processing but if
>>>>>> the business use case can tolerate that then go for it
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
>>>>>> *Sent:* Thursday, May 28, 2015 12:36 PM
>>>>>> *To:* dgoldenberg
>>>>>> *Cc:* spark users
>>>>>> *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate
>>>>>> of growth in Kafka or Spark's metrics?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of
>>>>>> spark streaming processes is not supported.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Longer version.*
>>>>>>
>>>>>>
>>>>>>
>>>>>> I assume that you are talking about Spark Streaming as the discussion
>>>>>> is about handing Kafka streaming data.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Then you have two things to consider: the Streaming receivers and the
>>>>>> Spark processing cluster.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Currently, the receiving topology is static. One receiver is
>>>>>> allocated with each DStream instantiated and it will use 1 core in the
>>>>>> cluster. Once the StreamingContext is started, this topology cannot be
>>>>>> changed, therefore the number of Kafka receivers is fixed for the 
>>>>>> lifetime
>>>>>> of your DStream.
>>>>>>
>>>>>> What we do is to calculate the cluster capacity and use that as a
>>>>>> fixed upper bound (with a margin) for the receiver throughput.
>>>>>>
>>>>>>
>>>>>>
>>>>>> There's work in progress to add a reactive model to the receiver,
>>>>>> where backpressure can be applied to handle overload conditions. See
>>>>>> https://issues.apache.org/jira/browse/SPARK-7398
>>>>>>
>>>>>>
>>>>>>
>>>>>> Once the data is received, it will be processed in a 'classical'
>>>>>> Spark pipeline, so previous posts on spark resource scheduling might 
>>>>>> apply.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regarding metrics, the standard metrics subsystem of spark will
>>>>>> report streaming job performance. Check the driver's metrics endpoint to
>>>>>> peruse the available metrics:
>>>>>>
>>>>>>
>>>>>>
>>>>>> <driver>:<ui-port>/metrics/json
>>>>>>
>>>>>>
>>>>>>
>>>>>> -kr, Gerard.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> (*) Spark is a project that moves so fast that statements might be
>>>>>> invalidated by new work every minute.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, May 28, 2015 at 1:21 AM, dgoldenberg <
>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to understand if there are design patterns for autoscaling
>>>>>> Spark
>>>>>> (add/remove slave machines to the cluster) based on the throughput.
>>>>>>
>>>>>> Assuming we can throttle Spark consumers, the respective Kafka topics
>>>>>> we
>>>>>> stream data from would start growing.  What are some of the ways to
>>>>>> generate
>>>>>> the metrics on the number of new messages and the rate they are
>>>>>> piling up?
>>>>>> This perhaps is more of a Kafka question; I see a pretty sparse
>>>>>> javadoc with
>>>>>> the Metric interface and not much else...
>>>>>>
>>>>>> What are some of the ways to expand/contract the Spark cluster?
>>>>>> Someone has
>>>>>> mentioned Mesos...
>>>>>>
>>>>>> I see some info on Spark metrics in  the Spark monitoring guide
>>>>>> <https://spark.apache.org/docs/latest/monitoring.html>  .  Do we
>>>>>> want to
>>>>>> perhaps implement a custom sink that would help us autoscale up or
>>>>>> down
>>>>>> based on the throughput?
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to