Re: Dynamic Allocation & Spark Streaming

2016-08-30 Thread Liren Ding
It's has been a while since last update on the thread. Now Spark 2.0 is
available, do you guys know if there's any progress on Dynamic Allocation &
Spark Streaming?

On Mon, Oct 19, 2015 at 1:13 PM, robert towne 
wrote:

> I have watched a few videos from Databricks/Andrew Or around the Spark 1.2
> release and it seemed that dynamic allocation was not yet available for
> Spark Streaming.
>
> I now see SPARK-10955  
> which
> is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic
> allocation.
>
> I use Spark Streaming with a receiverless/direct Kafka connection.  When I
> start up an app reading from the beginning of the topic I would like to
> have more resources than once I have caught up.  Is it possible to use
> dynamic allocation for this use case?
>
> thanks,
> Robert
>


Re: Dynamic Allocation & Spark Streaming

2015-11-06 Thread Adrian Tanase
You can register a streaming listener – in the BatchInfo you’ll find a lot of 
stats (including count of received records) that you can base your logic on:
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.BatchInfo

From: Kyle Lin
Date: Friday, November 6, 2015 at 11:48 AM
To: Tathagata Das
Cc: robert towne, user
Subject: Re: Dynamic Allocation & Spark Streaming

Hey there

I run Spark streaming 1.5.1 on YARN with Dynamic allocation, and use direct 
stream API to read data from Kafka.

Spark job can dynamically request a executor when reaching 
spark.dynamicAllocation.schedulerBacklogTimeout.

However, it won't dynamically remove executor when there is no more data from 
Kafka, because executors won't be idle but continually get empty RDD.

Is it possible to find that there are more than N continuing empty RDDs and 
remove executors manually? How could I remember how many empty RDD I get and 
remove executors?

Kyle


2015-10-20 4:48 GMT+08:00 Tathagata Das 
<t...@databricks.com<mailto:t...@databricks.com>>:
Unfortunately the title on the JIRA is extremely confusing. I have fixed it.

The reason why dynamic allocation does not work well with streaming is that the 
heuristic that is used to automatically scale up or down the number of 
executors works for the pattern of task schedules in batch jobs, not for 
streaming jobs. We would definitely solve this in future, may be 1.7.0 or later.
In the mean time, there are developer API function that allows you add and 
remove executors explicitly. See sparkContext.requestExecutors() and 
sparkContext.killExecutors(). With this you can write your own scaling logic. 
In your case I would do the following.
1. Ask for a large number of executors / cores through spark-submit.
2. Use a StreamingListener to monitor whether it has caught up.
3. Then call killExecutors, to slowly kill a few of them, but make sure using 
the listener that the scheduling delay does not go up.

Hope this helps. Let me know if this works for you.

On Mon, Oct 19, 2015 at 1:13 PM, robert towne 
<binarymecha...@gmail.com<mailto:binarymecha...@gmail.com>> wrote:
I have watched a few videos from Databricks/Andrew Or around the Spark 1.2 
release and it seemed that dynamic allocation was not yet available for Spark 
Streaming.

I now see SPARK-10955<https://issues.apache.org/jira/browse/SPARK-10955> which 
is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic 
allocation.

I use Spark Streaming with a receiverless/direct Kafka connection.  When I 
start up an app reading from the beginning of the topic I would like to have 
more resources than once I have caught up.  Is it possible to use dynamic 
allocation for this use case?

thanks,
Robert




Re: Dynamic Allocation & Spark Streaming

2015-11-06 Thread Kyle Lin
Hey there

I run Spark streaming 1.5.1 on YARN with Dynamic allocation, and use direct
stream API to read data from Kafka.

Spark job can dynamically request a executor when
reaching spark.dynamicAllocation.schedulerBacklogTimeout.

However, it won't dynamically remove executor when there is no more data
from Kafka, because executors won't be idle but continually get empty RDD.

Is it possible to find that there are more than N continuing empty RDDs and
remove executors manually? How could I remember how many empty RDD I get
and remove executors?

Kyle


2015-10-20 4:48 GMT+08:00 Tathagata Das :

> Unfortunately the title on the JIRA is extremely confusing. I have fixed
> it.
>
> The reason why dynamic allocation does not work well with streaming is
> that the heuristic that is used to automatically scale up or down the
> number of executors works for the pattern of task schedules in batch jobs,
> not for streaming jobs. We would definitely solve this in future, may be
> 1.7.0 or later.
> In the mean time, there are developer API function that allows you add and
> remove executors explicitly. See sparkContext.requestExecutors() and
> sparkContext.killExecutors(). With this you can write your own scaling
> logic. In your case I would do the following.
> 1. Ask for a large number of executors / cores through spark-submit.
> 2. Use a StreamingListener to monitor whether it has caught up.
> 3. Then call killExecutors, to slowly kill a few of them, but make sure
> using the listener that the scheduling delay does not go up.
>
> Hope this helps. Let me know if this works for you.
>
> On Mon, Oct 19, 2015 at 1:13 PM, robert towne 
> wrote:
>
>> I have watched a few videos from Databricks/Andrew Or around the Spark
>> 1.2 release and it seemed that dynamic allocation was not yet available for
>> Spark Streaming.
>>
>> I now see SPARK-10955  
>> which
>> is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic
>> allocation.
>>
>> I use Spark Streaming with a receiverless/direct Kafka connection.  When
>> I start up an app reading from the beginning of the topic I would like to
>> have more resources than once I have caught up.  Is it possible to use
>> dynamic allocation for this use case?
>>
>> thanks,
>> Robert
>>
>
>


Re: Dynamic Allocation & Spark Streaming

2015-10-19 Thread Tathagata Das
Unfortunately the title on the JIRA is extremely confusing. I have fixed it.

The reason why dynamic allocation does not work well with streaming is that
the heuristic that is used to automatically scale up or down the number of
executors works for the pattern of task schedules in batch jobs, not for
streaming jobs. We would definitely solve this in future, may be 1.7.0 or
later.
In the mean time, there are developer API function that allows you add and
remove executors explicitly. See sparkContext.requestExecutors() and
sparkContext.killExecutors(). With this you can write your own scaling
logic. In your case I would do the following.
1. Ask for a large number of executors / cores through spark-submit.
2. Use a StreamingListener to monitor whether it has caught up.
3. Then call killExecutors, to slowly kill a few of them, but make sure
using the listener that the scheduling delay does not go up.

Hope this helps. Let me know if this works for you.

On Mon, Oct 19, 2015 at 1:13 PM, robert towne 
wrote:

> I have watched a few videos from Databricks/Andrew Or around the Spark 1.2
> release and it seemed that dynamic allocation was not yet available for
> Spark Streaming.
>
> I now see SPARK-10955  
> which
> is tied to 1.5.2 and allows disabling of Spark Streaming with dynamic
> allocation.
>
> I use Spark Streaming with a receiverless/direct Kafka connection.  When I
> start up an app reading from the beginning of the topic I would like to
> have more resources than once I have caught up.  Is it possible to use
> dynamic allocation for this use case?
>
> thanks,
> Robert
>