Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-12 Thread Steve Loughran
These are both really good posts: you should try and get them in to the 
documentation.

with anything implementing dynamicness, there are some fun problems

(a) detecting the delays in the workflow. There's some good ideas here
(b) deciding where to address it. That means you need to monitor the entire 
pipeline —which you should be doing in production anyway.
(c) choosing the action. More nodes, more memory & CPU (not that useful for 
Java code, even when YARN adds support for dynamic container resize)
(d) choosing the size of the action. In a shared cluster, extra resources for 
one app comes at the expense of others. If you have pre-emption turned on in 
YARN, the scheduler can take containers off lower priority work, which 
automates a lot of this decision making. That will lose other work though, so 
to justify it you'd better hang on those containers
(e) deciding if/when to hand things back. Scaling things down can be very 
expensive if lots of state has to get rebuilt elsewhere.

I think Apache Helix from LinkedIn has done some good work here -worth looking 
at to see what lessons & code to lift. And as you'd expect, it sits right 
behind Kafka in production. I think it gets away with low delays to scale 
up/down and relying on low rebuild costs. [In the work I've been doing with 
colleagues on dynamic HBase and Accumulo clusters, we've not attempted to do 
any autoscale, because scale down is an expensive decision...we're focusing on 
liveness detection and reaction, then publishing the metrics needed to allow 
people or cross-application tools to make the decision)

On 12 Jun 2015, at 04:38, Dmitry Goldenberg 
mailto:dgoldenberg...@gmail.com>> wrote:

Yes, Tathagata, thank you.

For #1, the 'need detection', one idea we're entertaining is timestamping the 
messages coming into the Kafka topics. The consumers would check the interval 
between the time they get the message and that message origination timestamp. 
As Kafka topics start to fill up more, we would presumably see longer and 
longer wait times (delays) for messages to be getting processed by the 
consumers.  The consumers would then start firing off critical events into an 
Event Analyzer/Aggregator which would decide that more resources are needed, 
then ask the Provisioning Component to allocate N new machines.

We do want to set maxRatePerPartition in order to not overwhelm the consumers 
and run out of memory.  Machine provisioning may take a while, and if left with 
no maxRate guards, our consumers could run out of memory.

"Since there are no receivers, if the cluster gets a new executor, it will 
automatically start getting used to run tasks... no need to do anything 
further."  This is great, actually. We were wondering whether we'd need to 
restart the consumers once the new machines have been added. Tathagata's point 
implies, as I read it, that no further orchestration is needed, the load will 
start getting redistributed automatically. This makes implementation of 
autoscaling a lot simpler, as far as #3.

One issue that's not yet been covered much is the scenario when *fewer* cluster 
resources become required (a system load valley rather than a peak). To detect 
a low volume, we'd need to measure the throughput in messages per second over 
time.  Real low volumes would cause firing off of critical events signaling to 
the Analyzer that machines could be decommissioned.

If machines are being decommissioned, it would seem that the consumers would 
need to get acquiesced (allowed to process any current batch, then shut down), 
then they would restart themselves or be restarted. Thoughts on this?

There is also a hefty #4 here which is the "hysteresis" of this, where the 
system operates adaptively and learns over time, remembering the history of 
cluster expansions and contractions and allowing a certain slack for letting 
things cool down or heat up more gradually; also not contracting or expanding 
too frequently.  PID controllers  and thermostat types of design patterns have 
been mentioned before in this discussion.


If you look at the big cloud apps, they dynamically reallocate VM images based 
on load history, with Netflix being the poster user: Hadoop work in the quiet 
hours, user interaction evenings and weekends. Excluding special events 
(including holidays), there's a lot of regularity over time, which lets you 
predict workload in advance.  It's like your thermostat knowing fridays are 
cold and it should crank up the heating in advance.






On Thu, Jun 11, 2015 at 11:08 PM, Tathagata Das 
mailto:t...@databricks.com>> wrote:
Let me try to add some clarity in the different thought directions that's going 
on in this thread.

1. HOW TO DETECT THE NEED FOR MORE CLUSTER RESOURCES?

If there are not rate limits set up, the most reliable way to detect whether 
the current Spark cluster is being insufficient to handle the data load is to 
use the StreamingListner interface which gives all the information about when 

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-11 Thread Dmitry Goldenberg
; getting used to run tasks, including reading from Kafka (remember, Kafka
> direct approach reads from Kafka like a file system, from any node that
> runs the task). So it will immediately start using the extra resources, no
> need to do anything further.
>
> (b) Receiver: This is definitely tricky. If you dont need to increase the
> number of receivers, then a new executor will start getting used for
> computations (shuffles, writing out, etc.), but the parallelism in
> receiving will not increase. If you need to increase that, then its best to
> shutdown the context gracefully (so that no data is lost), and a new
> StreamingContext can be started with more receivers (# receivers <= #
> executors), and may be more #partitions for shuffles. You have call stop on
> currently running streaming context, to start a new one. If a context is
> stopped, any thread stuck in awaitTermniation will get unblocked.
>
> Does that clarify things?
>
>
>
>
>
>
>
> On Thu, Jun 11, 2015 at 7:30 AM, Cody Koeninger 
> wrote:
>
>> Depends on what you're reusing multiple times (if anything).
>>
>> Read
>> http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
>>
>> On Wed, Jun 10, 2015 at 12:18 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> At which point would I call cache()?  I just want the runtime to spill
>>> to disk when necessary without me having to know when the "necessary" is.
>>>
>>>
>>> On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger 
>>> wrote:
>>>
>>>> direct stream isn't a receiver, it isn't required to cache data
>>>> anywhere unless you want it to.
>>>>
>>>> If you want it, just call cache.
>>>>
>>>> On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
>>>>> appears the storage level can be specified in the createStream methods but
>>>>> not createDirectStream...
>>>>>
>>>>>
>>>>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>>>>> wrote:
>>>>>
>>>>>> You can also try Dynamic Resource Allocation
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>>>>
>>>>>>
>>>>>>
>>>>>> Also re the Feedback Loop for automatic message consumption rate
>>>>>> adjustment – there is a “dumb” solution option – simply set the storage
>>>>>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>>>>>> exhausted spark streaming will resort to keeping new RDDs on disk which
>>>>>> will prevent it from crashing and hence loosing them. Then some memory 
>>>>>> will
>>>>>> get freed and it will resort back to RAM and so on and so forth
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Sent from Samsung Mobile
>>>>>>
>>>>>>  Original message 
>>>>>>
>>>>>> From: Evo Eftimov
>>>>>>
>>>>>> Date:2015/05/28 13:22 (GMT+00:00)
>>>>>>
>>>>>> To: Dmitry Goldenberg
>>>>>>
>>>>>> Cc: Gerard Maas ,spark users
>>>>>>
>>>>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>>>>> growth in Kafka or Spark's metrics?
>>>>>>
>>>>>>
>>>>>>
>>>>>> You can always spin new boxes in the background and bring them into
>>>>>> the cluster fold when fully operational and time that with job relaunch 
>>>>>> and
>>>>>> param change
>>>>>>
>>>>>>
>>>>>>
>>>>>> Kafka offsets are mabaged automatically for you by the kafka clients
>>>>>> which keep them in zoomeeper dont worry about that ad long as you shut 
>>>>>> down
>>>>>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>>>>>> deal if necessary
>>>>>>
>>>>>>
&

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-11 Thread Tathagata Das
” solution option – simply set the storage
>>>>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>>>>> exhausted spark streaming will resort to keeping new RDDs on disk which
>>>>> will prevent it from crashing and hence loosing them. Then some memory 
>>>>> will
>>>>> get freed and it will resort back to RAM and so on and so forth
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Sent from Samsung Mobile
>>>>>
>>>>>  Original message 
>>>>>
>>>>> From: Evo Eftimov
>>>>>
>>>>> Date:2015/05/28 13:22 (GMT+00:00)
>>>>>
>>>>> To: Dmitry Goldenberg
>>>>>
>>>>> Cc: Gerard Maas ,spark users
>>>>>
>>>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>>>> growth in Kafka or Spark's metrics?
>>>>>
>>>>>
>>>>>
>>>>> You can always spin new boxes in the background and bring them into
>>>>> the cluster fold when fully operational and time that with job relaunch 
>>>>> and
>>>>> param change
>>>>>
>>>>>
>>>>>
>>>>> Kafka offsets are mabaged automatically for you by the kafka clients
>>>>> which keep them in zoomeeper dont worry about that ad long as you shut 
>>>>> down
>>>>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>>>>> deal if necessary
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Sent from Samsung Mobile
>>>>>
>>>>>
>>>>>
>>>>>  Original message 
>>>>>
>>>>> From: Dmitry Goldenberg
>>>>>
>>>>> Date:2015/05/28 13:16 (GMT+00:00)
>>>>>
>>>>> To: Evo Eftimov
>>>>>
>>>>> Cc: Gerard Maas ,spark users
>>>>>
>>>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>>>> growth in Kafka or Spark's metrics?
>>>>>
>>>>>
>>>>>
>>>>> Thanks, Evo.  Per the last part of your comment, it sounds like we
>>>>> will need to implement a job manager which will be in control of starting
>>>>> the jobs, monitoring the status of the Kafka topic(s), shutting jobs down
>>>>> and marking them as ones to relaunch, scaling the cluster up/down by
>>>>> adding/removing machines, and relaunching the 'suspended' (shut down) 
>>>>> jobs.
>>>>>
>>>>>
>>>>>
>>>>> I suspect that relaunching the jobs may be tricky since that means
>>>>> keeping track of the starter offsets in Kafka topic(s) from which the jobs
>>>>> started working on.
>>>>>
>>>>>
>>>>>
>>>>> Ideally, we'd want to avoid a re-launch.  The 'suspension' and
>>>>> relaunching of jobs, coupled with the wait for the new machines to come
>>>>> online may turn out quite time-consuming which will make for lengthy
>>>>> request times, and our requests are not asynchronous.  Ideally, the
>>>>> currently running jobs would continue to run on the machines currently
>>>>> available in the cluster.
>>>>>
>>>>>
>>>>>
>>>>> In the scale-down case, the job manager would want to signal to
>>>>> Spark's job scheduler not to send work to the node being taken out, find
>>>>> out when the last job has finished running on the node, then take the node
>>>>> out.
>>>>>
>>>>>
>>>>>
>>>>> This is somewhat like changing the number of cylinders in a car engine
>>>>> while the car is running...
>>>>>
>>>>>
>>>>>
>>>>> Sounds like a great candidate for a set of enhancements in Spark...
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov 
>>>>> wrote:
>>>>>
>>>>> @DG; The key metrics should be
>>>>>
>>>>>
>>>>>
>>>>> -  Sche

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-11 Thread Cody Koeninger
Depends on what you're reusing multiple times (if anything).

Read
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

On Wed, Jun 10, 2015 at 12:18 AM, Dmitry Goldenberg <
dgoldenberg...@gmail.com> wrote:

> At which point would I call cache()?  I just want the runtime to spill to
> disk when necessary without me having to know when the "necessary" is.
>
>
> On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger  wrote:
>
>> direct stream isn't a receiver, it isn't required to cache data anywhere
>> unless you want it to.
>>
>> If you want it, just call cache.
>>
>> On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
>>> appears the storage level can be specified in the createStream methods but
>>> not createDirectStream...
>>>
>>>
>>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>>> wrote:
>>>
>>>> You can also try Dynamic Resource Allocation
>>>>
>>>>
>>>>
>>>>
>>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>>
>>>>
>>>>
>>>> Also re the Feedback Loop for automatic message consumption rate
>>>> adjustment – there is a “dumb” solution option – simply set the storage
>>>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>>>> exhausted spark streaming will resort to keeping new RDDs on disk which
>>>> will prevent it from crashing and hence loosing them. Then some memory will
>>>> get freed and it will resort back to RAM and so on and so forth
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Sent from Samsung Mobile
>>>>
>>>>  Original message 
>>>>
>>>> From: Evo Eftimov
>>>>
>>>> Date:2015/05/28 13:22 (GMT+00:00)
>>>>
>>>> To: Dmitry Goldenberg
>>>>
>>>> Cc: Gerard Maas ,spark users
>>>>
>>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>>> growth in Kafka or Spark's metrics?
>>>>
>>>>
>>>>
>>>> You can always spin new boxes in the background and bring them into the
>>>> cluster fold when fully operational and time that with job relaunch and
>>>> param change
>>>>
>>>>
>>>>
>>>> Kafka offsets are mabaged automatically for you by the kafka clients
>>>> which keep them in zoomeeper dont worry about that ad long as you shut down
>>>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>>>> deal if necessary
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Sent from Samsung Mobile
>>>>
>>>>
>>>>
>>>>  Original message 
>>>>
>>>> From: Dmitry Goldenberg
>>>>
>>>> Date:2015/05/28 13:16 (GMT+00:00)
>>>>
>>>> To: Evo Eftimov
>>>>
>>>> Cc: Gerard Maas ,spark users
>>>>
>>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>>> growth in Kafka or Spark's metrics?
>>>>
>>>>
>>>>
>>>> Thanks, Evo.  Per the last part of your comment, it sounds like we will
>>>> need to implement a job manager which will be in control of starting the
>>>> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
>>>> marking them as ones to relaunch, scaling the cluster up/down by
>>>> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>>>>
>>>>
>>>>
>>>> I suspect that relaunching the jobs may be tricky since that means
>>>> keeping track of the starter offsets in Kafka topic(s) from which the jobs
>>>> started working on.
>>>>
>>>>
>>>>
>>>> Ideally, we'd want to avoid a re-launch.  The 'suspension' and
>>>> relaunching of jobs, coupled with the wait for the new machines to come
>>>> online may turn out quite time-consuming which will make for lengthy
>>>> request times, and our requests are not asynchronous.  Ideally, the
>>>> currently running jobs would continue t

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-11 Thread Dmitry Goldenberg
If I want to restart my consumers into an updated cluster topology after
the cluster has been expanded or contracted, would I need to call stop() on
them, then call start() on them, or would I need to instantiate and start
new context objects (new JavaStreamingContext(...)) ?  I'm thinking of
actually acquiescing these streaming consumers but letting them finish
their current batch first.

Right now I'm doing

jssc.start();
jssc.awaitTermination();

Must jssc.close() be called as well, after awaitTermination(), to avoid
potentially leaking contexts?  I don't see that in things
like JavaDirectKafkaWordCount but wondering if that's needed.

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov  wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov 
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-09 Thread Dmitry Goldenberg
At which point would I call cache()?  I just want the runtime to spill to
disk when necessary without me having to know when the "necessary" is.


On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger  wrote:

> direct stream isn't a receiver, it isn't required to cache data anywhere
> unless you want it to.
>
> If you want it, just call cache.
>
> On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
>> appears the storage level can be specified in the createStream methods but
>> not createDirectStream...
>>
>>
>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>> wrote:
>>
>>> You can also try Dynamic Resource Allocation
>>>
>>>
>>>
>>>
>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>
>>>
>>>
>>> Also re the Feedback Loop for automatic message consumption rate
>>> adjustment – there is a “dumb” solution option – simply set the storage
>>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>>> exhausted spark streaming will resort to keeping new RDDs on disk which
>>> will prevent it from crashing and hence loosing them. Then some memory will
>>> get freed and it will resort back to RAM and so on and so forth
>>>
>>>
>>>
>>>
>>>
>>> Sent from Samsung Mobile
>>>
>>>  Original message 
>>>
>>> From: Evo Eftimov
>>>
>>> Date:2015/05/28 13:22 (GMT+00:00)
>>>
>>> To: Dmitry Goldenberg
>>>
>>> Cc: Gerard Maas ,spark users
>>>
>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>> growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> You can always spin new boxes in the background and bring them into the
>>> cluster fold when fully operational and time that with job relaunch and
>>> param change
>>>
>>>
>>>
>>> Kafka offsets are mabaged automatically for you by the kafka clients
>>> which keep them in zoomeeper dont worry about that ad long as you shut down
>>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>>> deal if necessary
>>>
>>>
>>>
>>>
>>>
>>> Sent from Samsung Mobile
>>>
>>>
>>>
>>>  Original message 
>>>
>>> From: Dmitry Goldenberg
>>>
>>> Date:2015/05/28 13:16 (GMT+00:00)
>>>
>>> To: Evo Eftimov
>>>
>>> Cc: Gerard Maas ,spark users
>>>
>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>> growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> Thanks, Evo.  Per the last part of your comment, it sounds like we will
>>> need to implement a job manager which will be in control of starting the
>>> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
>>> marking them as ones to relaunch, scaling the cluster up/down by
>>> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>>>
>>>
>>>
>>> I suspect that relaunching the jobs may be tricky since that means
>>> keeping track of the starter offsets in Kafka topic(s) from which the jobs
>>> started working on.
>>>
>>>
>>>
>>> Ideally, we'd want to avoid a re-launch.  The 'suspension' and
>>> relaunching of jobs, coupled with the wait for the new machines to come
>>> online may turn out quite time-consuming which will make for lengthy
>>> request times, and our requests are not asynchronous.  Ideally, the
>>> currently running jobs would continue to run on the machines currently
>>> available in the cluster.
>>>
>>>
>>>
>>> In the scale-down case, the job manager would want to signal to Spark's
>>> job scheduler not to send work to the node being taken out, find out when
>>> the last job has finished running on the node, then take the node out.
>>>
>>>
>>>
>>> This is somewhat like changing the number of cylinders in a car engine
>>> while the car is running...
>>>
>>>
>>>
>>> Sounds like a great candidate for a set of enhancements in Spark...
>>>
>>>
>>

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-04 Thread Cody Koeninger
direct stream isn't a receiver, it isn't required to cache data anywhere
unless you want it to.

If you want it, just call cache.

On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg 
wrote:

> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
> appears the storage level can be specified in the createStream methods but
> not createDirectStream...
>
>
> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
> wrote:
>
>> You can also try Dynamic Resource Allocation
>>
>>
>>
>>
>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>
>>
>>
>> Also re the Feedback Loop for automatic message consumption rate
>> adjustment – there is a “dumb” solution option – simply set the storage
>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>> exhausted spark streaming will resort to keeping new RDDs on disk which
>> will prevent it from crashing and hence loosing them. Then some memory will
>> get freed and it will resort back to RAM and so on and so forth
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile
>>
>>  Original message ----
>>
>> From: Evo Eftimov
>>
>> Date:2015/05/28 13:22 (GMT+00:00)
>>
>> To: Dmitry Goldenberg
>>
>> Cc: Gerard Maas ,spark users
>>
>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>> growth in Kafka or Spark's metrics?
>>
>>
>>
>> You can always spin new boxes in the background and bring them into the
>> cluster fold when fully operational and time that with job relaunch and
>> param change
>>
>>
>>
>> Kafka offsets are mabaged automatically for you by the kafka clients
>> which keep them in zoomeeper dont worry about that ad long as you shut down
>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>> deal if necessary
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile
>>
>>
>>
>>  Original message 
>>
>> From: Dmitry Goldenberg
>>
>> Date:2015/05/28 13:16 (GMT+00:00)
>>
>> To: Evo Eftimov
>>
>> Cc: Gerard Maas ,spark users
>>
>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>> growth in Kafka or Spark's metrics?
>>
>>
>>
>> Thanks, Evo.  Per the last part of your comment, it sounds like we will
>> need to implement a job manager which will be in control of starting the
>> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
>> marking them as ones to relaunch, scaling the cluster up/down by
>> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>>
>>
>>
>> I suspect that relaunching the jobs may be tricky since that means
>> keeping track of the starter offsets in Kafka topic(s) from which the jobs
>> started working on.
>>
>>
>>
>> Ideally, we'd want to avoid a re-launch.  The 'suspension' and
>> relaunching of jobs, coupled with the wait for the new machines to come
>> online may turn out quite time-consuming which will make for lengthy
>> request times, and our requests are not asynchronous.  Ideally, the
>> currently running jobs would continue to run on the machines currently
>> available in the cluster.
>>
>>
>>
>> In the scale-down case, the job manager would want to signal to Spark's
>> job scheduler not to send work to the node being taken out, find out when
>> the last job has finished running on the node, then take the node out.
>>
>>
>>
>> This is somewhat like changing the number of cylinders in a car engine
>> while the car is running...
>>
>>
>>
>> Sounds like a great candidate for a set of enhancements in Spark...
>>
>>
>>
>> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov 
>> wrote:
>>
>> @DG; The key metrics should be
>>
>>
>>
>> -  Scheduling delay – its ideal state is to remain constant over
>> time and ideally be less than the time of the microbatch window
>>
>> -  The average job processing time should remain less than the
>> micro-batch window
>>
>> -  Number of Lost Jobs – even if there is a single Job lost that
>> means that you have lost all messages for the DStream RDD processed by that
>> job due to the previously described spark streaming memory leak condition
>> and subsequent crash – described in previous postings submitted 

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-04 Thread Dmitry Goldenberg
"set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
appears the storage level can be specified in the createStream methods but
not createDirectStream...


On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov  wrote:

> You can also try Dynamic Resource Allocation
>
>
>
>
> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Also re the Feedback Loop for automatic message consumption rate
> adjustment – there is a “dumb” solution option – simply set the storage
> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
> exhausted spark streaming will resort to keeping new RDDs on disk which
> will prevent it from crashing and hence loosing them. Then some memory will
> get freed and it will resort back to RAM and so on and so forth
>
>
>
>
>
> Sent from Samsung Mobile
>
>  Original message 
>
> From: Evo Eftimov
>
> Date:2015/05/28 13:22 (GMT+00:00)
>
> To: Dmitry Goldenberg
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> You can always spin new boxes in the background and bring them into the
> cluster fold when fully operational and time that with job relaunch and
> param change
>
>
>
> Kafka offsets are mabaged automatically for you by the kafka clients which
> keep them in zoomeeper dont worry about that ad long as you shut down your
> job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
> necessary
>
>
>
>
>
> Sent from Samsung Mobile
>
>
>
> ---- Original message ----
>
> From: Dmitry Goldenberg
>
> Date:2015/05/28 13:16 (GMT+00:00)
>
> To: Evo Eftimov
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> Thanks, Evo.  Per the last part of your comment, it sounds like we will
> need to implement a job manager which will be in control of starting the
> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
> marking them as ones to relaunch, scaling the cluster up/down by
> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>
>
>
> I suspect that relaunching the jobs may be tricky since that means keeping
> track of the starter offsets in Kafka topic(s) from which the jobs started
> working on.
>
>
>
> Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
> of jobs, coupled with the wait for the new machines to come online may turn
> out quite time-consuming which will make for lengthy request times, and our
> requests are not asynchronous.  Ideally, the currently running jobs would
> continue to run on the machines currently available in the cluster.
>
>
>
> In the scale-down case, the job manager would want to signal to Spark's
> job scheduler not to send work to the node being taken out, find out when
> the last job has finished running on the node, then take the node out.
>
>
>
> This is somewhat like changing the number of cylinders in a car engine
> while the car is running...
>
>
>
> Sounds like a great candidate for a set of enhancements in Spark...
>
>
>
> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov 
> wrote:
>
> @DG; The key metrics should be
>
>
>
> -  Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -  The average job processing time should remain less than the
> micro-batch window
>
> -  Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to see whether it is decreasing relentlessly at a constant rate –
> if it touches a predetermined RAM threshold that should be your third
> metric
>
>
>
> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
> you can implement one on your own without waiting for Jiras and new
> features whenever they might be implemented by the Spark dev team –
> moreover you can avoid using slow mechanisms such as ZooKeeper and even
> incorporate some Machine Learning in your Feedback Loop to make it handle
> the message consumption rate more intelligently and benefit from ongoing
>

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
I think what we'd want to do is track the ingestion rate in the consumer(s)
via Spark's aggregation functions and such. If we're at a critical level
(load too high / load too low) then we issue a request into our
Provisioning Component to add/remove machines. Once it comes back with an
"OK", each consumer can finish its current batch, then terminate itself,
and restart with a new context.  The new context would be aware of the
updated cluster - correct?  Therefore the refreshed consumer would restart
on the updated cluster.

Could we even terminate the consumer immediately upon sensing a critical
event?  When it would restart, could it resume right where it left off?

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov  wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov 
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> Thi

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
If we have a hand-off between the older consumer and the newer consumer, I
wonder if we need to manually manage the offsets in Kafka so as not to miss
some messages as the hand-off is happening.

Or if we let the new consumer run for a bit then let the old consumer know
the 'new guy is in town' then the old consumer can be shut off.  Some
overlap is OK...

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov  wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov 
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
>
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and rest

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
Great.

"You should monitor vital performance / job clogging stats of the Spark
Streaming Runtime not “kafka topics” -- anything specific you were thinking
of?

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov  wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov 
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
>
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
>
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than th

RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Evo Eftimov
Makes sense especially if you have a cloud with “infinite” resources / nodes 
which allows you to double, triple etc in the background/parallel the resources 
of the currently running cluster 

 

I was thinking more about the scenario where you have e.g. 100 boxes and want 
to / can add e.g. 20 more 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:46 PM
To: Evo Eftimov
Cc: Cody Koeninger; Andrew Or; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo,

 

One of the ideas is to shadow the current cluster. This way there's no extra 
latency incurred due to shutting down of the consumers. If two sets of 
consumers are running, potentially processing the same data, that is OK. We 
phase out the older cluster and gradually flip over to the new one, insuring no 
downtime or extra latency.  Thoughts?

 

On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov  wrote:

You should monitor vital performance / job clogging stats of the Spark 
Streaming Runtime not “kafka topics”

 

You should be able to bring new worker nodes online and make them contact and 
register with the Master without bringing down the Master (or any of the 
currently running worker nodes) 

 

Then just shutdown your currently running spark streaming job/app and restart 
it with new params to take advantage of the larger cluster 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:14 PM
To: Cody Koeninger
Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Would it be possible to implement Spark autoscaling somewhat along these lines? 
--

 

1. If we sense that a new machine is needed, by watching the data load in Kafka 
topic(s), then

2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and 
get a machine);

3. Create a "shadow"/mirror Spark master running alongside the initial version 
which talks to N machines. The new mirror version is aware of N+1 machines (or 
N+M if we had decided we needed M new boxes).

4. The previous version of the Spark runtime is acquiesced/decommissioned.  We 
possibly get both clusters working on the same data which may actually be OK 
(at least for our specific use-cases).

5. Now the new Spark cluster is running.

 

Similarly, the decommissioning of M unused boxes would happen, via this notion 
of a mirror Spark runtime.  How feasible would it be for such a mirrorlike 
setup to be created, especially created programmatically?  Especially point #3.

 

The other idea we'd entertained was to bring in a new machine, acquiesce down 
all currently running workers by telling them to process their current batch 
then shut down, then restart the consumers now that Spark is aware of a 
modified cluster.  This has the drawback of a downtime that may not be 
tolerable in terms of latency, by the system's clients waiting for their 
responses in a synchronous fashion.

 

Thanks.

 

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger  wrote:

I'm not sure that points 1 and 2 really apply to the kafka direct stream.  
There are no receivers, and you know at the driver how big each of your batches 
is.

 

On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:

Hi all,

 

As the author of the dynamic allocation feature I can offer a few insights here.

 

Gerard's explanation was both correct and concise: dynamic allocation is not 
intended to be used in Spark streaming at the moment (1.4 or before). This is 
because of two things:

 

(1) Number of receivers is necessarily fixed, and these are started in 
executors. Since we need a receiver for each InputDStream, if we kill these 
receivers we essentially stop the stream, which is not what we want. It makes 
little sense to close and restart a stream the same way we kill and relaunch 
executors.

 

(2) Records come in every batch, and when there is data to process your 
executors are not idle. If your idle timeout is less than the batch duration, 
then you'll end up having to constantly kill and restart executors. If your 
idle timeout is greater than the batch duration, then you'll never kill 
executors.

 

Long answer short, with Spark streaming there is currently no straightforward 
way to scale the size of your cluster. I had a long discussion with TD (Spark 
streaming lead) about what needs to be done to provide some semblance of 
dynamic scaling to streaming applications, e.g. take into account the batch 
queue instead. We came up with a few ideas that I will not detail here, but we 
are looking into this and do intend to support it in the near future.

 

-Andrew

 

 

 

2015-05-28 8:02 GMT-07:00 Evo Eftimov :

 

Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it 
will be 

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
Evo,

One of the ideas is to shadow the current cluster. This way there's no
extra latency incurred due to shutting down of the consumers. If two sets
of consumers are running, potentially processing the same data, that is OK.
We phase out the older cluster and gradually flip over to the new one,
insuring no downtime or extra latency.  Thoughts?

On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov  wrote:

> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
>
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
>
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
>
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
>
>
> -Andrew
>
>
>
>
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>
>
>
> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
> – it will be your insurance policy against sys crashes due to m

RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Evo Eftimov
You should monitor vital performance / job clogging stats of the Spark 
Streaming Runtime not “kafka topics”

 

You should be able to bring new worker nodes online and make them contact and 
register with the Master without bringing down the Master (or any of the 
currently running worker nodes) 

 

Then just shutdown your currently running spark streaming job/app and restart 
it with new params to take advantage of the larger cluster 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:14 PM
To: Cody Koeninger
Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Would it be possible to implement Spark autoscaling somewhat along these lines? 
--

 

1. If we sense that a new machine is needed, by watching the data load in Kafka 
topic(s), then

2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and 
get a machine);

3. Create a "shadow"/mirror Spark master running alongside the initial version 
which talks to N machines. The new mirror version is aware of N+1 machines (or 
N+M if we had decided we needed M new boxes).

4. The previous version of the Spark runtime is acquiesced/decommissioned.  We 
possibly get both clusters working on the same data which may actually be OK 
(at least for our specific use-cases).

5. Now the new Spark cluster is running.

 

Similarly, the decommissioning of M unused boxes would happen, via this notion 
of a mirror Spark runtime.  How feasible would it be for such a mirrorlike 
setup to be created, especially created programmatically?  Especially point #3.

 

The other idea we'd entertained was to bring in a new machine, acquiesce down 
all currently running workers by telling them to process their current batch 
then shut down, then restart the consumers now that Spark is aware of a 
modified cluster.  This has the drawback of a downtime that may not be 
tolerable in terms of latency, by the system's clients waiting for their 
responses in a synchronous fashion.

 

Thanks.

 

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger  wrote:

I'm not sure that points 1 and 2 really apply to the kafka direct stream.  
There are no receivers, and you know at the driver how big each of your batches 
is.

 

On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:

Hi all,

 

As the author of the dynamic allocation feature I can offer a few insights here.

 

Gerard's explanation was both correct and concise: dynamic allocation is not 
intended to be used in Spark streaming at the moment (1.4 or before). This is 
because of two things:

 

(1) Number of receivers is necessarily fixed, and these are started in 
executors. Since we need a receiver for each InputDStream, if we kill these 
receivers we essentially stop the stream, which is not what we want. It makes 
little sense to close and restart a stream the same way we kill and relaunch 
executors.

 

(2) Records come in every batch, and when there is data to process your 
executors are not idle. If your idle timeout is less than the batch duration, 
then you'll end up having to constantly kill and restart executors. If your 
idle timeout is greater than the batch duration, then you'll never kill 
executors.

 

Long answer short, with Spark streaming there is currently no straightforward 
way to scale the size of your cluster. I had a long discussion with TD (Spark 
streaming lead) about what needs to be done to provide some semblance of 
dynamic scaling to streaming applications, e.g. take into account the batch 
queue instead. We came up with a few ideas that I will not detail here, but we 
are looking into this and do intend to support it in the near future.

 

-Andrew

 

 

 

2015-05-28 8:02 GMT-07:00 Evo Eftimov :

 

Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it 
will be your insurance policy against sys crashes due to memory leaks. Until 
there is free RAM, spark streaming (spark) will NOT resort to disk – and of 
course resorting to disk from time to time (ie when there is no free RAM ) and 
taking a performance hit from that, BUT only until there is no free RAM 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Thursday, May 28, 2015 2:34 PM
To: Evo Eftimov
Cc: Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo, good points.

 

On the dynamic resource allocation, I'm surmising this only works within a 
particular cluster setup.  So it improves the usage of current cluster 
resources but it doesn't make the cluster itself elastic. At least, that's my 
understanding.

 

Memory + disk would be good and hopefully it'd take *huge* load on the system 
to start exhausting the disk space too.  I'd guess that falling onto disk wil

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
Would it be possible to implement Spark autoscaling somewhat along these
lines? --

1. If we sense that a new machine is needed, by watching the data load in
Kafka topic(s), then
2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
and get a machine);
3. Create a "shadow"/mirror Spark master running alongside the initial
version which talks to N machines. The new mirror version is aware of N+1
machines (or N+M if we had decided we needed M new boxes).
4. The previous version of the Spark runtime is acquiesced/decommissioned.
We possibly get both clusters working on the same data which may actually
be OK (at least for our specific use-cases).
5. Now the new Spark cluster is running.

Similarly, the decommissioning of M unused boxes would happen, via this
notion of a mirror Spark runtime.  How feasible would it be for such a
mirrorlike setup to be created, especially created programmatically?
Especially point #3.

The other idea we'd entertained was to bring in a new machine, acquiesce
down all currently running workers by telling them to process their current
batch then shut down, then restart the consumers now that Spark is aware of
a modified cluster.  This has the drawback of a downtime that may not be
tolerable in terms of latency, by the system's clients waiting for their
responses in a synchronous fashion.

Thanks.

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger  wrote:

> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
>> Hi all,
>>
>> As the author of the dynamic allocation feature I can offer a few
>> insights here.
>>
>> Gerard's explanation was both correct and concise: dynamic allocation is
>> not intended to be used in Spark streaming at the moment (1.4 or before).
>> This is because of two things:
>>
>> (1) Number of receivers is necessarily fixed, and these are started in
>> executors. Since we need a receiver for each InputDStream, if we kill these
>> receivers we essentially stop the stream, which is not what we want. It
>> makes little sense to close and restart a stream the same way we kill and
>> relaunch executors.
>>
>> (2) Records come in every batch, and when there is data to process your
>> executors are not idle. If your idle timeout is less than the batch
>> duration, then you'll end up having to constantly kill and restart
>> executors. If your idle timeout is greater than the batch duration, then
>> you'll never kill executors.
>>
>> Long answer short, with Spark streaming there is currently no
>> straightforward way to scale the size of your cluster. I had a long
>> discussion with TD (Spark streaming lead) about what needs to be done to
>> provide some semblance of dynamic scaling to streaming applications, e.g.
>> take into account the batch queue instead. We came up with a few ideas that
>> I will not detail here, but we are looking into this and do intend to
>> support it in the near future.
>>
>> -Andrew
>>
>>
>>
>> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>>
>> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>>> – it will be your insurance policy against sys crashes due to memory leaks.
>>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>>> and of course resorting to disk from time to time (ie when there is no free
>>> RAM ) and taking a performance hit from that, BUT only until there is no
>>> free RAM
>>>
>>>
>>>
>>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>>> *Sent:* Thursday, May 28, 2015 2:34 PM
>>> *To:* Evo Eftimov
>>> *Cc:* Gerard Maas; spark users
>>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>>> sizes/rate of growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> Evo, good points.
>>>
>>>
>>>
>>> On the dynamic resource allocation, I'm surmising this only works within
>>> a particular cluster setup.  So it improves the usage of current cluster
>>> resources but it doesn't make the cluster itself elastic. At least, that's
>>> my understanding.
>>>
>>>
>>>
>>> Memory + disk would be good and hopefully it'd take *huge* load on the
>>> system to start exhausting the disk space too.  I'd guess that falling onto
>>> disk will make things significantly slower due to the extra I/O.
>>>
>>>
>>&

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Which would imply that if there was a load manager type of service, it
could signal to the driver(s) that they need to acquiesce, i.e. process
what's at hand and terminate.  Then bring up a new machine, then restart
the driver(s)...  Same deal with removing machines from the cluster. Send a
signal for the drivers to pipe down and terminate, then restart them.

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger  wrote:

> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
>> Hi all,
>>
>> As the author of the dynamic allocation feature I can offer a few
>> insights here.
>>
>> Gerard's explanation was both correct and concise: dynamic allocation is
>> not intended to be used in Spark streaming at the moment (1.4 or before).
>> This is because of two things:
>>
>> (1) Number of receivers is necessarily fixed, and these are started in
>> executors. Since we need a receiver for each InputDStream, if we kill these
>> receivers we essentially stop the stream, which is not what we want. It
>> makes little sense to close and restart a stream the same way we kill and
>> relaunch executors.
>>
>> (2) Records come in every batch, and when there is data to process your
>> executors are not idle. If your idle timeout is less than the batch
>> duration, then you'll end up having to constantly kill and restart
>> executors. If your idle timeout is greater than the batch duration, then
>> you'll never kill executors.
>>
>> Long answer short, with Spark streaming there is currently no
>> straightforward way to scale the size of your cluster. I had a long
>> discussion with TD (Spark streaming lead) about what needs to be done to
>> provide some semblance of dynamic scaling to streaming applications, e.g.
>> take into account the batch queue instead. We came up with a few ideas that
>> I will not detail here, but we are looking into this and do intend to
>> support it in the near future.
>>
>> -Andrew
>>
>>
>>
>> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>>
>> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>>> – it will be your insurance policy against sys crashes due to memory leaks.
>>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>>> and of course resorting to disk from time to time (ie when there is no free
>>> RAM ) and taking a performance hit from that, BUT only until there is no
>>> free RAM
>>>
>>>
>>>
>>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>>> *Sent:* Thursday, May 28, 2015 2:34 PM
>>> *To:* Evo Eftimov
>>> *Cc:* Gerard Maas; spark users
>>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>>> sizes/rate of growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> Evo, good points.
>>>
>>>
>>>
>>> On the dynamic resource allocation, I'm surmising this only works within
>>> a particular cluster setup.  So it improves the usage of current cluster
>>> resources but it doesn't make the cluster itself elastic. At least, that's
>>> my understanding.
>>>
>>>
>>>
>>> Memory + disk would be good and hopefully it'd take *huge* load on the
>>> system to start exhausting the disk space too.  I'd guess that falling onto
>>> disk will make things significantly slower due to the extra I/O.
>>>
>>>
>>>
>>> Perhaps we'll really want all of these elements eventually.  I think
>>> we'd want to start with memory only, keeping maxRate low enough not to
>>> overwhelm the consumers; implement the cluster autoscaling.  We might
>>> experiment with dynamic resource allocation before we get to implement the
>>> cluster autoscale.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>>> wrote:
>>>
>>> You can also try Dynamic Resource Allocation
>>>
>>>
>>>
>>>
>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>
>>>
>>>
>>> Also re the Feedback Loop for automatic message consumption rate
>>> adjustment – there is a “dumb” solution option – simply set the storage
>>> policy for the DStream RDDs t

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Cody Koeninger
I'm not sure that points 1 and 2 really apply to the kafka direct stream.
There are no receivers, and you know at the driver how big each of your
batches is.

On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:

> Hi all,
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
> -Andrew
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>
> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>> – it will be your insurance policy against sys crashes due to memory leaks.
>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>> and of course resorting to disk from time to time (ie when there is no free
>> RAM ) and taking a performance hit from that, BUT only until there is no
>> free RAM
>>
>>
>>
>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>> *Sent:* Thursday, May 28, 2015 2:34 PM
>> *To:* Evo Eftimov
>> *Cc:* Gerard Maas; spark users
>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>> sizes/rate of growth in Kafka or Spark's metrics?
>>
>>
>>
>> Evo, good points.
>>
>>
>>
>> On the dynamic resource allocation, I'm surmising this only works within
>> a particular cluster setup.  So it improves the usage of current cluster
>> resources but it doesn't make the cluster itself elastic. At least, that's
>> my understanding.
>>
>>
>>
>> Memory + disk would be good and hopefully it'd take *huge* load on the
>> system to start exhausting the disk space too.  I'd guess that falling onto
>> disk will make things significantly slower due to the extra I/O.
>>
>>
>>
>> Perhaps we'll really want all of these elements eventually.  I think we'd
>> want to start with memory only, keeping maxRate low enough not to overwhelm
>> the consumers; implement the cluster autoscaling.  We might experiment with
>> dynamic resource allocation before we get to implement the cluster
>> autoscale.
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>> wrote:
>>
>> You can also try Dynamic Resource Allocation
>>
>>
>>
>>
>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>
>>
>>
>> Also re the Feedback Loop for automatic message consumption rate
>> adjustment – there is a “dumb” solution option – simply set the storage
>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>> exhausted spark streaming will resort to keeping new RDDs on disk which
>> will prevent it from crashing and hence loosing them. Then some memory will
>> get freed and it will resort back to RAM and so on and so forth
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile
>>
>>  Original message 
>>
>> From: Evo Eftimov
>>
>> Date:2015/05/28 13:22 (GMT+00:00)
>>
>> To: Dmitry Goldenberg
>>
>> Cc: Gerard Maas ,spark users
>>
>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>> growth in Kafka or Spark's metrics?
>>
>>
>>
>> You can always spin new boxes in the background and bring them into the
>&g

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thanks, Andrew.

>From speaking with customers, this is one of the most pressing issues for
them (burning hot, to be precise), especially in a SAAS type of environment
and especially with commodity hardware at play. Understandably, folks don't
want to pay for more hardware usage than necessary and they want to be able
to handle the peaks and valleys of usage (especially the peaks) optimally.

It looks like there needs to be a generic 'watchdog' type of service which
would get metrics/signals from things like Kafka, then call into a
(potentially custom) handler which will cause new hardware to be
provisioned or decomissioned.  Needless to say, both Spark, the watchdog,
and the provisioner need to be completely in sync and mindful of currently
running Spark jobs so that new hardware immediately picks up extra load and
hardware is only decommissioned as any running Spark jobs have been
acquiesced...

As I learn more about the configuration parameters and dynamic resource
allocation, I'm starting to feel that a dashboard with all these knobs
exposed would be so useful. Being able to test/simulate load volumes and
tweak the knobs as necessary, to arrive at the optimal patterns...

Regards,
- Dmitry

On Thu, May 28, 2015 at 3:21 PM, Andrew Or  wrote:

> Hi all,
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
> -Andrew
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>
>> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>> – it will be your insurance policy against sys crashes due to memory leaks.
>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>> and of course resorting to disk from time to time (ie when there is no free
>> RAM ) and taking a performance hit from that, BUT only until there is no
>> free RAM
>>
>>
>>
>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>> *Sent:* Thursday, May 28, 2015 2:34 PM
>> *To:* Evo Eftimov
>> *Cc:* Gerard Maas; spark users
>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>> sizes/rate of growth in Kafka or Spark's metrics?
>>
>>
>>
>> Evo, good points.
>>
>>
>>
>> On the dynamic resource allocation, I'm surmising this only works within
>> a particular cluster setup.  So it improves the usage of current cluster
>> resources but it doesn't make the cluster itself elastic. At least, that's
>> my understanding.
>>
>>
>>
>> Memory + disk would be good and hopefully it'd take *huge* load on the
>> system to start exhausting the disk space too.  I'd guess that falling onto
>> disk will make things significantly slower due to the extra I/O.
>>
>>
>>
>> Perhaps we'll really want all of these elements eventually.  I think we'd
>> want to start with memory only, keeping maxRate low enough not to overwhelm
>> the consumers; implement the cluster autoscaling.  We might experiment with
>> dynamic resource allocation before we get to implement the cluster
>> autoscale.
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>> wrote:
>>
>> You can also try Dynamic Resource Allocation
>>
>>
>>
>>
>> https://spark.apache.org/docs/1.3.1/job-scheduling.ht

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Andrew Or
Hi all,

As the author of the dynamic allocation feature I can offer a few insights
here.

Gerard's explanation was both correct and concise: dynamic allocation is
not intended to be used in Spark streaming at the moment (1.4 or before).
This is because of two things:

(1) Number of receivers is necessarily fixed, and these are started in
executors. Since we need a receiver for each InputDStream, if we kill these
receivers we essentially stop the stream, which is not what we want. It
makes little sense to close and restart a stream the same way we kill and
relaunch executors.

(2) Records come in every batch, and when there is data to process your
executors are not idle. If your idle timeout is less than the batch
duration, then you'll end up having to constantly kill and restart
executors. If your idle timeout is greater than the batch duration, then
you'll never kill executors.

Long answer short, with Spark streaming there is currently no
straightforward way to scale the size of your cluster. I had a long
discussion with TD (Spark streaming lead) about what needs to be done to
provide some semblance of dynamic scaling to streaming applications, e.g.
take into account the batch queue instead. We came up with a few ideas that
I will not detail here, but we are looking into this and do intend to
support it in the near future.

-Andrew



2015-05-28 8:02 GMT-07:00 Evo Eftimov :

> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
> – it will be your insurance policy against sys crashes due to memory leaks.
> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
> and of course resorting to disk from time to time (ie when there is no free
> RAM ) and taking a performance hit from that, BUT only until there is no
> free RAM
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Thursday, May 28, 2015 2:34 PM
> *To:* Evo Eftimov
> *Cc:* Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo, good points.
>
>
>
> On the dynamic resource allocation, I'm surmising this only works within a
> particular cluster setup.  So it improves the usage of current cluster
> resources but it doesn't make the cluster itself elastic. At least, that's
> my understanding.
>
>
>
> Memory + disk would be good and hopefully it'd take *huge* load on the
> system to start exhausting the disk space too.  I'd guess that falling onto
> disk will make things significantly slower due to the extra I/O.
>
>
>
> Perhaps we'll really want all of these elements eventually.  I think we'd
> want to start with memory only, keeping maxRate low enough not to overwhelm
> the consumers; implement the cluster autoscaling.  We might experiment with
> dynamic resource allocation before we get to implement the cluster
> autoscale.
>
>
>
>
>
>
>
> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
> wrote:
>
> You can also try Dynamic Resource Allocation
>
>
>
>
> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Also re the Feedback Loop for automatic message consumption rate
> adjustment – there is a “dumb” solution option – simply set the storage
> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
> exhausted spark streaming will resort to keeping new RDDs on disk which
> will prevent it from crashing and hence loosing them. Then some memory will
> get freed and it will resort back to RAM and so on and so forth
>
>
>
>
>
> Sent from Samsung Mobile
>
>  Original message 
>
> From: Evo Eftimov
>
> Date:2015/05/28 13:22 (GMT+00:00)
>
> To: Dmitry Goldenberg
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> You can always spin new boxes in the background and bring them into the
> cluster fold when fully operational and time that with job relaunch and
> param change
>
>
>
> Kafka offsets are mabaged automatically for you by the kafka clients which
> keep them in zoomeeper dont worry about that ad long as you shut down your
> job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
> necessary
>
>
>
>
>
> Sent from Samsung Mobile
>
>
>
>  Original message 
>
> From: Dmitry Goldenberg
>
> Date:2015/05/28 13:16 (GMT+00:00)
>
> To: Evo Eftimov
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metr

RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it 
will be your insurance policy against sys crashes due to memory leaks. Until 
there is free RAM, spark streaming (spark) will NOT resort to disk – and of 
course resorting to disk from time to time (ie when there is no free RAM ) and 
taking a performance hit from that, BUT only until there is no free RAM 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Thursday, May 28, 2015 2:34 PM
To: Evo Eftimov
Cc: Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo, good points.

 

On the dynamic resource allocation, I'm surmising this only works within a 
particular cluster setup.  So it improves the usage of current cluster 
resources but it doesn't make the cluster itself elastic. At least, that's my 
understanding.

 

Memory + disk would be good and hopefully it'd take *huge* load on the system 
to start exhausting the disk space too.  I'd guess that falling onto disk will 
make things significantly slower due to the extra I/O.

 

Perhaps we'll really want all of these elements eventually.  I think we'd want 
to start with memory only, keeping maxRate low enough not to overwhelm the 
consumers; implement the cluster autoscaling.  We might experiment with dynamic 
resource allocation before we get to implement the cluster autoscale.

 

 

 

On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov  wrote:

You can also try Dynamic Resource Allocation

 

https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
 

 

Also re the Feedback Loop for automatic message consumption rate adjustment – 
there is a “dumb” solution option – simply set the storage policy for the 
DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark 
streaming will resort to keeping new RDDs on disk which will prevent it from 
crashing and hence loosing them. Then some memory will get freed and it will 
resort back to RAM and so on and so forth  

 

 

Sent from Samsung Mobile

 Original message 

From: Evo Eftimov 

Date:2015/05/28 13:22 (GMT+00:00) 

To: Dmitry Goldenberg 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

 

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary

 

 

Sent from Samsung Mobile

 

 Original message 

From: Dmitry Goldenberg 

Date:2015/05/28 13:16 (GMT+00:00) 

To: Evo Eftimov 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

Thanks, Evo.  Per the last part of your comment, it sounds like we will need to 
implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

 

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

 

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

 

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

 

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

 

Sounds like a great candidate for a set of enhancements in Spark...

 

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:

@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition an

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Evo, good points.

On the dynamic resource allocation, I'm surmising this only works within a
particular cluster setup.  So it improves the usage of current cluster
resources but it doesn't make the cluster itself elastic. At least, that's
my understanding.

Memory + disk would be good and hopefully it'd take *huge* load on the
system to start exhausting the disk space too.  I'd guess that falling onto
disk will make things significantly slower due to the extra I/O.

Perhaps we'll really want all of these elements eventually.  I think we'd
want to start with memory only, keeping maxRate low enough not to overwhelm
the consumers; implement the cluster autoscaling.  We might experiment with
dynamic resource allocation before we get to implement the cluster
autoscale.



On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov  wrote:

> You can also try Dynamic Resource Allocation
>
>
>
>
> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Also re the Feedback Loop for automatic message consumption rate
> adjustment – there is a “dumb” solution option – simply set the storage
> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
> exhausted spark streaming will resort to keeping new RDDs on disk which
> will prevent it from crashing and hence loosing them. Then some memory will
> get freed and it will resort back to RAM and so on and so forth
>
>
>
>
>
> Sent from Samsung Mobile
>
>  Original message 
>
> From: Evo Eftimov
>
> Date:2015/05/28 13:22 (GMT+00:00)
>
> To: Dmitry Goldenberg
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> You can always spin new boxes in the background and bring them into the
> cluster fold when fully operational and time that with job relaunch and
> param change
>
>
>
> Kafka offsets are mabaged automatically for you by the kafka clients which
> keep them in zoomeeper dont worry about that ad long as you shut down your
> job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
> necessary
>
>
>
>
>
> Sent from Samsung Mobile
>
>
>
> ---- Original message ----
>
> From: Dmitry Goldenberg
>
> Date:2015/05/28 13:16 (GMT+00:00)
>
> To: Evo Eftimov
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> Thanks, Evo.  Per the last part of your comment, it sounds like we will
> need to implement a job manager which will be in control of starting the
> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
> marking them as ones to relaunch, scaling the cluster up/down by
> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>
>
>
> I suspect that relaunching the jobs may be tricky since that means keeping
> track of the starter offsets in Kafka topic(s) from which the jobs started
> working on.
>
>
>
> Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
> of jobs, coupled with the wait for the new machines to come online may turn
> out quite time-consuming which will make for lengthy request times, and our
> requests are not asynchronous.  Ideally, the currently running jobs would
> continue to run on the machines currently available in the cluster.
>
>
>
> In the scale-down case, the job manager would want to signal to Spark's
> job scheduler not to send work to the node being taken out, find out when
> the last job has finished running on the node, then take the node out.
>
>
>
> This is somewhat like changing the number of cylinders in a car engine
> while the car is running...
>
>
>
> Sounds like a great candidate for a set of enhancements in Spark...
>
>
>
> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov 
> wrote:
>
> @DG; The key metrics should be
>
>
>
> -  Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -  The average job processing time should remain less than the
> micro-batch window
>
> -  Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to 

FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
You can also try Dynamic Resource Allocation

 

https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
 

 

Also re the Feedback Loop for automatic message consumption rate adjustment – 
there is a “dumb” solution option – simply set the storage policy for the 
DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark 
streaming will resort to keeping new RDDs on disk which will prevent it from 
crashing and hence loosing them. Then some memory will get freed and it will 
resort back to RAM and so on and so forth  

 

 

Sent from Samsung Mobile

 Original message 

From: Evo Eftimov 

Date:2015/05/28 13:22 (GMT+00:00) 

To: Dmitry Goldenberg 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

 

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary

 

 

Sent from Samsung Mobile

 

 Original message 

From: Dmitry Goldenberg 

Date:2015/05/28 13:16 (GMT+00:00) 

To: Evo Eftimov 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

Thanks, Evo.  Per the last part of your comment, it sounds like we will need to 
implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

 

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

 

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

 

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

 

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

 

Sounds like a great candidate for a set of enhancements in Spark...

 

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:

@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric 

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution 

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, 

Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary


Sent from Samsung Mobile

 Original message From: Dmitry Goldenberg 
 Date:2015/05/28  13:16  (GMT+00:00) 
To: Evo Eftimov  Cc: Gerard Maas 
,spark users  Subject: 
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or 
Spark's metrics? 
Thanks, Evo.  Per the last part of your comment, it sounds like we will 
need to implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

Sounds like a great candidate for a set of enhancements in Spark...

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:
@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers is fixed for the lifetime of your DStream. 

What we do is to calculate the cluster capacity and use that as a fi

Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thanks, Evo.  Per the last part of your comment, it sounds like we will
need to implement a job manager which will be in control of starting the
jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
marking them as ones to relaunch, scaling the cluster up/down by
adding/removing machines, and relaunching the 'suspended' (shut down) jobs.

I suspect that relaunching the jobs may be tricky since that means keeping
track of the starter offsets in Kafka topic(s) from which the jobs started
working on.

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
of jobs, coupled with the wait for the new machines to come online may turn
out quite time-consuming which will make for lengthy request times, and our
requests are not asynchronous.  Ideally, the currently running jobs would
continue to run on the machines currently available in the cluster.

In the scale-down case, the job manager would want to signal to Spark's job
scheduler not to send work to the node being taken out, find out when the
last job has finished running on the node, then take the node out.

This is somewhat like changing the number of cylinders in a car engine
while the car is running...

Sounds like a great candidate for a set of enhancements in Spark...

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:

> @DG; The key metrics should be
>
>
>
> -  Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -  The average job processing time should remain less than the
> micro-batch window
>
> -  Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to see whether it is decreasing relentlessly at a constant rate –
> if it touches a predetermined RAM threshold that should be your third
> metric
>
>
>
> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
> you can implement one on your own without waiting for Jiras and new
> features whenever they might be implemented by the Spark dev team –
> moreover you can avoid using slow mechanisms such as ZooKeeper and even
> incorporate some Machine Learning in your Feedback Loop to make it handle
> the message consumption rate more intelligently and benefit from ongoing
> online learning – BUT this is STILL about voluntarily sacrificing your
> performance in the name of keeping your system stable – it is not about
> scaling your system/solution
>
>
>
> In terms of how to scale the Spark Framework Dynamically – even though
> this is not supported at the moment out of the box I guess you can have a
> sys management framework spin dynamically a few more boxes (spark worker
> nodes), stop dynamically your currently running Spark Streaming Job,
> relaunch it with new params e.g. more Receivers, larger number of
> Partitions (hence tasks), more RAM per executor etc. Obviously this will
> cause some temporary delay in fact interruption in your processing but if
> the business use case can tolerate that then go for it
>
>
>
> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
> *Sent:* Thursday, May 28, 2015 12:36 PM
> *To:* dgoldenberg
> *Cc:* spark users
> *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of
> growth in Kafka or Spark's metrics?
>
>
>
> Hi,
>
>
>
> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
> streaming processes is not supported.
>
>
>
>
>
> *Longer version.*
>
>
>
> I assume that you are talking about Spark Streaming as the discussion is
> about handing Kafka streaming data.
>
>
>
> Then you have two things to consider: the Streaming receivers and the
> Spark processing cluster.
>
>
>
> Currently, the receiving topology is static. One receiver is allocated
> with each DStream instantiated and it will use 1 core in the cluster. Once
> the StreamingContext is started, this topology cannot be changed, therefore
> the number of Kafka receivers is fixed for the lifetime of your DStream.
>
> What we do is to calculate the cluster capacity and use that as a fixed
> upper bound (with a margin) for the receiver throughput.
>
>
>
> There's work in progress to add a reactive model to the receiver, where
> backpressure can be applied to handle overload conditions. See
> https://issues.apache.org/jira/browse/SPARK-7398
>
>
>
> Once the data is recei

RE: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric 

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution 

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers is fixed for the lifetime of your DStream. 

What we do is to calculate the cluster capacity and use that as a fixed upper 
bound (with a margin) for the receiver throughput.

 

There's work in progress to add a reactive model to the receiver, where 
backpressure can be applied to handle overload conditions. See 
https://issues.apache.org/jira/browse/SPARK-7398

 

Once the data is received, it will be processed in a 'classical' Spark 
pipeline, so previous posts on spark resource scheduling might apply.

 

Regarding metrics, the standard metrics subsystem of spark will report 
streaming job performance. Check the driver's metrics endpoint to peruse the 
available metrics:

 

:/metrics/json

 

-kr, Gerard.

 

 

(*) Spark is a project that moves so fast that statements might be invalidated 
by new work every minute.

 

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg  wrote:

Hi,

I'm trying to understand if there are design patterns for autoscaling Spark
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we
stream data from would start growing.  What are some of the ways to generate
the metrics on the number of new messages and the rate they are piling up?
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has
mentioned Mesos...

I see some info on Spark metrics in  the Spark monitoring guide
<https://spark.apache.org/docs/latest/monitoring.html>  .  Do we want to
perhaps implement a custom sink that would help us autoscale up or down
based on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thank you, Gerard.

We're looking at the receiver-less setup with Kafka Spark streaming so I'm
not sure how to apply your comments to that case (not that we have to use
receiver-less but it seems to offer some advantages over the
receiver-based).

As far as "the number of Kafka receivers is fixed for the lifetime of your
DStream" -- this may be OK to start with. What I'm researching is the
ability to add worker nodes to the Spark cluster when needed and remove
them when no longer needed.  Do I understand correctly that a single
receiver may cause work to be farmed out to multiple 'slave'
machines/worker nodes?  If that's the case, we're less concerned with
multiple receivers; we're concerned with the worker node cluster itself.

If we use the ConsumerOffsetChecker class in Kafka that Rajesh mentioned
and instrument dynamic adding/removal of machines, my subsequent questions
then are, a) will Spark sense the addition of a new node / is it sufficient
that the cluster manager is aware, then work just starts flowing there?
 and  b) what would be a way to gracefully remove a worker node when the
load subsides, so that no currently running Spark job is killed?

- Dmitry

On Thu, May 28, 2015 at 7:36 AM, Gerard Maas  wrote:

> Hi,
>
> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
> streaming processes is not supported.
>
>
> *Longer version.*
>
> I assume that you are talking about Spark Streaming as the discussion is
> about handing Kafka streaming data.
>
> Then you have two things to consider: the Streaming receivers and the
> Spark processing cluster.
>
> Currently, the receiving topology is static. One receiver is allocated
> with each DStream instantiated and it will use 1 core in the cluster. Once
> the StreamingContext is started, this topology cannot be changed, therefore
> the number of Kafka receivers is fixed for the lifetime of your DStream.
> What we do is to calculate the cluster capacity and use that as a fixed
> upper bound (with a margin) for the receiver throughput.
>
> There's work in progress to add a reactive model to the receiver, where
> backpressure can be applied to handle overload conditions. See
> https://issues.apache.org/jira/browse/SPARK-7398
>
> Once the data is received, it will be processed in a 'classical' Spark
> pipeline, so previous posts on spark resource scheduling might apply.
>
> Regarding metrics, the standard metrics subsystem of spark will report
> streaming job performance. Check the driver's metrics endpoint to peruse
> the available metrics:
>
> :/metrics/json
>
> -kr, Gerard.
>
>
> (*) Spark is a project that moves so fast that statements might be
> invalidated by new work every minute.
>
> On Thu, May 28, 2015 at 1:21 AM, dgoldenberg 
> wrote:
>
>> Hi,
>>
>> I'm trying to understand if there are design patterns for autoscaling
>> Spark
>> (add/remove slave machines to the cluster) based on the throughput.
>>
>> Assuming we can throttle Spark consumers, the respective Kafka topics we
>> stream data from would start growing.  What are some of the ways to
>> generate
>> the metrics on the number of new messages and the rate they are piling up?
>> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
>> with
>> the Metric interface and not much else...
>>
>> What are some of the ways to expand/contract the Spark cluster? Someone
>> has
>> mentioned Mesos...
>>
>> I see some info on Spark metrics in  the Spark monitoring guide
>>   .  Do we want to
>> perhaps implement a custom sink that would help us autoscale up or down
>> based on the throughput?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Gerard Maas
Hi,

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
streaming processes is not supported.


*Longer version.*

I assume that you are talking about Spark Streaming as the discussion is
about handing Kafka streaming data.

Then you have two things to consider: the Streaming receivers and the Spark
processing cluster.

Currently, the receiving topology is static. One receiver is allocated with
each DStream instantiated and it will use 1 core in the cluster. Once the
StreamingContext is started, this topology cannot be changed, therefore the
number of Kafka receivers is fixed for the lifetime of your DStream.
What we do is to calculate the cluster capacity and use that as a fixed
upper bound (with a margin) for the receiver throughput.

There's work in progress to add a reactive model to the receiver, where
backpressure can be applied to handle overload conditions. See
https://issues.apache.org/jira/browse/SPARK-7398

Once the data is received, it will be processed in a 'classical' Spark
pipeline, so previous posts on spark resource scheduling might apply.

Regarding metrics, the standard metrics subsystem of spark will report
streaming job performance. Check the driver's metrics endpoint to peruse
the available metrics:

:/metrics/json

-kr, Gerard.


(*) Spark is a project that moves so fast that statements might be
invalidated by new work every minute.

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg 
wrote:

> Hi,
>
> I'm trying to understand if there are design patterns for autoscaling Spark
> (add/remove slave machines to the cluster) based on the throughput.
>
> Assuming we can throttle Spark consumers, the respective Kafka topics we
> stream data from would start growing.  What are some of the ways to
> generate
> the metrics on the number of new messages and the rate they are piling up?
> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
> with
> the Metric interface and not much else...
>
> What are some of the ways to expand/contract the Spark cluster? Someone has
> mentioned Mesos...
>
> I see some info on Spark metrics in  the Spark monitoring guide
>   .  Do we want to
> perhaps implement a custom sink that would help us autoscale up or down
> based on the throughput?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread Ted Yu
bq. detect the presence of a new node and start utilizing it

My understanding is that Spark is concerned with managing executors.
Whether request for an executor is fulfilled on an existing node or a new
node is up to the underlying cluster manager (YARN e.g.).
Assuming the cluster is single tenant, executor requests are more likely to
be fulfilled on the new nodes.

Please correct me if I am wrong.

On Wed, May 27, 2015 at 8:26 PM, Dmitry Goldenberg  wrote:

> Thanks, Rajesh.  I think that acquring/relinquishing executors is
> important but I feel like there are at least two layers for resource
> allocation and autoscaling.  It seems that acquiring and relinquishing
> executors is a way to optimize resource utilization within a pre-set Spark
> cluster of machines.
>
> However, to accommodate for big spikes in input data, we also need the
> actual cluster scaling, i.e. adding (or removing, when no longer needed)
> worker node machines automatically.  On that front, I wonder how Spark
> reacts to a machine being added or removed and what the actual procedure
> would be.  If we're running on a Hadoop cluster, there's a description of
> adding a node
> <http://wiki.apache.org/hadoop/FAQ#I_have_a_new_node_I_want_to_add_to_a_running_Hadoop_cluster.3B_how_do_I_start_services_on_just_one_node.3F>
> there.  There's also discussions of Hadoop node adding/removal such as this
> one
> <http://stackoverflow.com/questions/16774439/how-do-i-correctly-remove-nodes-in-hadoop>
> .
>
> My worry is, will Spark "gracefully" and "quickly" detect the presence of
> a new node and start utilizing it (i.e. how much does it communicate with
> the Hadoop cluster manager?)...  By the same token, if a node is removed,
> how can it be removed gracefully so as not to affect/kill any running Spark
> jobs?
>
> On Wed, May 27, 2015 at 10:57 PM,  wrote:
>
>> *Dell - Internal Use - Confidential *
>>
>> Did you check
>> https://drive.google.com/file/d/0B7tmGAdbfMI2OXl6azYySk5iTGM/edit and
>> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>
>>
>>
>> Not sure if the spark kafka receiver emits metrics on the lag, check this
>>  link out
>> http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer
>>
>>
>>
>> You should be able to whip up a script that runs the Kafka
>> ConsumerOffsetChecker periodically and pipe it to a metrics backend of your
>> choice. Based on this you can work the dynamic resource allocation magic.
>>
>> -----Original Message-
>> From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
>> Sent: Wednesday, May 27, 2015 6:21 PM
>> To: user@spark.apache.org
>> Subject: Autoscaling Spark cluster based on topic sizes/rate of growth in
>> Kafka or Spark's metrics?
>>
>> Hi,
>>
>> I'm trying to understand if there are design patterns for autoscaling
>> Spark (add/remove slave machines to the cluster) based on the throughput.
>>
>> Assuming we can throttle Spark consumers, the respective Kafka topics we
>> stream data from would start growing. What are some of the ways to generate
>> the metrics on the number of new messages and the rate they are piling up?
>> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
>> with the Metric interface and not much else...
>>
>> What are some of the ways to expand/contract the Spark cluster? Someone
>> has mentioned Mesos...
>>
>> I see some info on Spark metrics in the Spark monitoring guide . Do we
>> want to perhaps implement a custom sink that would help us autoscale up or
>> down based on the throughput?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>
>


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread Dmitry Goldenberg
Thanks, Rajesh.  I think that acquring/relinquishing executors is important
but I feel like there are at least two layers for resource allocation and
autoscaling.  It seems that acquiring and relinquishing executors is a way
to optimize resource utilization within a pre-set Spark cluster of machines.

However, to accommodate for big spikes in input data, we also need the
actual cluster scaling, i.e. adding (or removing, when no longer needed)
worker node machines automatically.  On that front, I wonder how Spark
reacts to a machine being added or removed and what the actual procedure
would be.  If we're running on a Hadoop cluster, there's a description of
adding a node
<http://wiki.apache.org/hadoop/FAQ#I_have_a_new_node_I_want_to_add_to_a_running_Hadoop_cluster.3B_how_do_I_start_services_on_just_one_node.3F>
there.  There's also discussions of Hadoop node adding/removal such as this
one
<http://stackoverflow.com/questions/16774439/how-do-i-correctly-remove-nodes-in-hadoop>
.

My worry is, will Spark "gracefully" and "quickly" detect the presence of a
new node and start utilizing it (i.e. how much does it communicate with the
Hadoop cluster manager?)...  By the same token, if a node is removed, how
can it be removed gracefully so as not to affect/kill any running Spark
jobs?

On Wed, May 27, 2015 at 10:57 PM,  wrote:

> *Dell - Internal Use - Confidential *
>
> Did you check
> https://drive.google.com/file/d/0B7tmGAdbfMI2OXl6azYySk5iTGM/edit and
> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Not sure if the spark kafka receiver emits metrics on the lag, check this
>  link out
> http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer
>
>
>
> You should be able to whip up a script that runs the Kafka
> ConsumerOffsetChecker periodically and pipe it to a metrics backend of your
> choice. Based on this you can work the dynamic resource allocation magic.
>
> -Original Message-
> From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
> Sent: Wednesday, May 27, 2015 6:21 PM
> To: user@spark.apache.org
> Subject: Autoscaling Spark cluster based on topic sizes/rate of growth in
> Kafka or Spark's metrics?
>
> Hi,
>
> I'm trying to understand if there are design patterns for autoscaling
> Spark (add/remove slave machines to the cluster) based on the throughput.
>
> Assuming we can throttle Spark consumers, the respective Kafka topics we
> stream data from would start growing. What are some of the ways to generate
> the metrics on the number of new messages and the rate they are piling up?
> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
> with the Metric interface and not much else...
>
> What are some of the ways to expand/contract the Spark cluster? Someone
> has mentioned Mesos...
>
> I see some info on Spark metrics in the Spark monitoring guide . Do we
> want to perhaps implement a custom sink that would help us autoscale up or
> down based on the throughput?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>


RE: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread Rajesh_Kalluri
Dell - Internal Use - Confidential
Did you check https://drive.google.com/file/d/0B7tmGAdbfMI2OXl6azYySk5iTGM/edit 
and 
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

Not sure if the spark kafka receiver emits metrics on the lag, check this  link 
out 
http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer

You should be able to whip up a script that runs the Kafka 
ConsumerOffsetChecker periodically and pipe it to a metrics backend of your 
choice. Based on this you can work the dynamic resource allocation magic.

-Original Message-
From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
Sent: Wednesday, May 27, 2015 6:21 PM
To: user@spark.apache.org
Subject: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka 
or Spark's metrics?

Hi,

I'm trying to understand if there are design patterns for autoscaling Spark 
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we stream 
data from would start growing. What are some of the ways to generate the 
metrics on the number of new messages and the rate they are piling up?
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with 
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has 
mentioned Mesos...

I see some info on Spark metrics in the Spark monitoring guide . Do we want to 
perhaps implement a custom sink that would help us autoscale up or down based 
on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread dgoldenberg
Hi,

I'm trying to understand if there are design patterns for autoscaling Spark
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we
stream data from would start growing.  What are some of the ways to generate
the metrics on the number of new messages and the rate they are piling up? 
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has
mentioned Mesos...

I see some info on Spark metrics in  the Spark monitoring guide
  .  Do we want to
perhaps implement a custom sink that would help us autoscale up or down
based on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org