Re: Number of executors change during job running

2016-05-02 Thread Vikash Pareek
Hi Bill,

You can try DirectStream and increase # of partition to kafka. then input
Dstream will have the partitions as per kafka topic without using
re-partitioning.

Can you please share your event timeline chart from spark ui. You need to
tune your configuration as per computation. Spark ui will give deeper
understanding of the problem.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-change-during-job-running-tp9243p26866.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Number of executors in spark-1.6 and spark-1.5

2016-04-10 Thread Vikash Pareek
Hi Talebzadeh,

Thank for your quick response.

>>in 1.6, how many executors do you see for each node?
I have1 executor for 1 node with SPARK_WORKER_INSTANCES=1.

>>in standalone mode how are you increasing the number of worker instances.
Are you starting another slave on each node?
No, I am not starting another slave node, I just changed *spark-env.sh *for
each slave node i.e. set SPARK_WORKER_INSTANCES=2.





Best Regards,


Vikash Pareek
Software Developer, *InfoObjects Inc.*
m: +918800206898 a: E5, Jhalana Institutional Area, Jaipur
s: vikaspareek1991 e: vikash.par...@infoobjects.com



On Sun, Apr 10, 2016 at 3:00 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> in 1.6, how many executors do you see for each node?
> in standalone mode how are you increasing the number of worker instances.
> Are you starting another slave on each node?
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 10 April 2016 at 08:26, Vikash Pareek 
> wrote:
>
>> Hi,
>>
>> I have upgraded 5 node spark cluster from spark-1.5 to spark-1.6 (to use
>> mapWithState function).
>> After using spark-1.6, I am getting a strange behaviour of spark, jobs are
>> not using multiple executors of different nodes at a time means there is
>> no
>> parallel processing if each node having single worker and executor.
>> I am running jobs in spark standalone mode.
>>
>> I observed following points related to this issue.
>> 1. If I run same job with spark-1.5 then this will use multiple executors
>> across different nodes at a time.
>> 2. In Spark-1.6, If I increase no of cores(spark.cores.max) then jobs are
>> running in parallel thread but within same executor.
>> 3. In Spark-1.6, If I increase no of worker instances on each node then
>> jobs
>> are running in parallel as no of workers but within same executor.
>>
>> Can anyone suggest, why spark 1.6 can not use multiple executors across
>> different node at a time for parallel processing.
>> Your suggestion will be highly appreciated.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-in-spark-1-6-and-spark-1-5-tp26733.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Number of executors in spark-1.6 and spark-1.5

2016-04-10 Thread Mich Talebzadeh
Hi,

in 1.6, how many executors do you see for each node?
in standalone mode how are you increasing the number of worker instances.
Are you starting another slave on each node?

HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 10 April 2016 at 08:26, Vikash Pareek 
wrote:

> Hi,
>
> I have upgraded 5 node spark cluster from spark-1.5 to spark-1.6 (to use
> mapWithState function).
> After using spark-1.6, I am getting a strange behaviour of spark, jobs are
> not using multiple executors of different nodes at a time means there is no
> parallel processing if each node having single worker and executor.
> I am running jobs in spark standalone mode.
>
> I observed following points related to this issue.
> 1. If I run same job with spark-1.5 then this will use multiple executors
> across different nodes at a time.
> 2. In Spark-1.6, If I increase no of cores(spark.cores.max) then jobs are
> running in parallel thread but within same executor.
> 3. In Spark-1.6, If I increase no of worker instances on each node then
> jobs
> are running in parallel as no of workers but within same executor.
>
> Can anyone suggest, why spark 1.6 can not use multiple executors across
> different node at a time for parallel processing.
> Your suggestion will be highly appreciated.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-in-spark-1-6-and-spark-1-5-tp26733.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Number of executors in Spark - Kafka

2016-01-21 Thread Cody Koeninger
6 kafka partitions will result in 6 spark partitions, not 6 spark rdds.

The question of whether you will have a backlog isn't just a matter of
having 1 executor per partition.  If a single executor can process all of
the partitions fast enough to complete a batch in under the required time,
you won't have a backlog.

On Thu, Jan 21, 2016 at 5:35 AM, Guillermo Ortiz 
wrote:

>
> I'm using Spark Streaming and Kafka with Direct Approach. I have created a
> topic with 6 partitions so when I execute Spark there are six RDD. I
> understand than ideally it should have six executors to process each one
> one RDD. To do it, when I execute spark-submit (I use  YARN) I specific the
> number executors to six.
> If I don't specific anything it just create one executor. Looking for
> information I have read:
>
> "The --num-executors command-line flag or spark.executor.instances 
> configuration
> property control the number of executors requested. Starting in CDH
> 5.4/Spark 1.3, you will be able to avoid setting this property by turning
> on dynamic allocation
> 
>  with
> thespark.dynamicAllocation.enabled property. Dynamic allocation enables a
> Spark application to request executors when there is a backlog of pending
> tasks and free up executors when idle."
>
> I have this parameter enabled, I understand than if I don't set the
> parameter --num-executors it must create six executors or am I wrong?
>


Re: number of executors in sparkR.init()

2015-12-25 Thread Felix Cheung
The equivalent for spark-submit --num-executors should be 
spark.executor.instancesWhen use in 
SparkConf?http://spark.apache.org/docs/latest/running-on-yarn.html
Could you try setting that with sparkR.init()?


_
From: Franc Carter 
Sent: Friday, December 25, 2015 9:23 PM
Subject: number of executors in sparkR.init()
To:  


   Hi,   
  I'm having trouble working out how to get the number of executors set 
when using sparkR.init().  
  If I start sparkR with  
    sparkR  --master yarn --num-executors 6   
  then I get 6 executors  
  However, if start sparkR with  
    sparkR
 
 followed by 
   sc <- sparkR.init(master="yarn-client",   
sparkEnvir=list(spark.num.executors='6')) 
 then I only get 2 executors. 
 Can anyone point me in the direction of what I might doing wrong ? 
I need to initialise this was so that rStudio can hook in to SparkR 
 thanks  
   --
   Franc


  

Re: number of executors in sparkR.init()

2015-12-25 Thread Franc Carter
Thanks, that works

cheers

On 26 December 2015 at 16:53, Felix Cheung 
wrote:

> The equivalent for spark-submit --num-executors should be
> spark.executor.instances
> When use in SparkConf?
> http://spark.apache.org/docs/latest/running-on-yarn.html
>
> Could you try setting that with sparkR.init()?
>
>
> _
> From: Franc Carter 
> Sent: Friday, December 25, 2015 9:23 PM
> Subject: number of executors in sparkR.init()
> To: 
>
>
>
> Hi,
>
> I'm having trouble working out how to get the number of executors set when
> using sparkR.init().
>
> If I start sparkR with
>
>   sparkR  --master yarn --num-executors 6
>
> then I get 6 executors
>
> However, if start sparkR with
>
>   sparkR
>
> followed by
>
>   sc <- sparkR.init(master="yarn-client",
> sparkEnvir=list(spark.num.executors='6'))
>
> then I only get 2 executors.
>
> Can anyone point me in the direction of what I might doing wrong ? I need
> to initialise this was so that rStudio can hook in to SparkR
>
> thanks
>
> --
> Franc
>
>
>


-- 
Franc


Re: number of executors

2015-05-18 Thread edward cui
Oh BTW, it's spark 1.3.1 on hadoop 2.4. AIM 3.6.

Sorry for lefting out this information.

Appreciate for any help!

Ed

2015-05-18 12:53 GMT-04:00 edward cui edwardcu...@gmail.com:

 I actually have the same problem, but I am not sure whether it is a spark
 problem or a Yarn problem.

 I set up a five nodes cluster on aws emr, start yarn daemon on the master
 (The node manager will not be started on default on the master, I don't
 want to waste any resource since I have to pay). And submit the spark task
 through yarn-cluster mode. The command is:
 ./spark/bin/spark-submit --master yearn-cluster --num-executors 5
 --exectutor-cores 4 --propertifies-file spark-application.conf myapp.py

 But the yarn resource manager only created 4 containers on 4 nodes, and
 one node was completely on idle.

 More details about the setup:
 EMR node:
 m3.xlarge: 16g ram 4 cores 40g ssd (HDFS on EBS?)

 Yarn-site.xml:
 yarn.scheduler.maximum-allocation-mb=11520
 yarn.nodemanager.resource.memory-mb=11520

 Spark-conf:

 spark.executor.memory 10g

 spark.storage.memoryFraction  0.2

 spark.python.worker.memory1500mspark.akka.frameSize   
  200spark.shuffle.memoryFraction0.1

 spark.driver.memory 10g


 Hadoop behavior observed:
 Create 4 containers on four nodes including emr master but one emr slave
 on idle (memory consumption around 2g and 0% cpu occupation)
 Spark use one container for driver on emr slave node (make sense since I
 required that much of memory)
 Use the other three node for computing the tasks.


 If yarn can't use all the nodes and I have to pay for the node, it's just a 
 big waste : p


 Any thoughts on this?


 Great thanks,

 Ed



 2015-05-18 12:07 GMT-04:00 Sandy Ryza sandy.r...@cloudera.com:

 *All

 On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 
 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a
 simple app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe










Re: number of executors

2015-05-18 Thread Sandy Ryza
*All

On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe








Re: number of executors

2015-05-18 Thread Sandy Ryza
Hi Xiaohe,

The all Spark options must go before the jar or they won't take effect.

-Sandy

On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems in
 my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe







Re: number of executors

2015-05-18 Thread edward cui
I actually have the same problem, but I am not sure whether it is a spark
problem or a Yarn problem.

I set up a five nodes cluster on aws emr, start yarn daemon on the master
(The node manager will not be started on default on the master, I don't
want to waste any resource since I have to pay). And submit the spark task
through yarn-cluster mode. The command is:
./spark/bin/spark-submit --master yearn-cluster --num-executors 5
--exectutor-cores 4 --propertifies-file spark-application.conf myapp.py

But the yarn resource manager only created 4 containers on 4 nodes, and one
node was completely on idle.

More details about the setup:
EMR node:
m3.xlarge: 16g ram 4 cores 40g ssd (HDFS on EBS?)

Yarn-site.xml:
yarn.scheduler.maximum-allocation-mb=11520
yarn.nodemanager.resource.memory-mb=11520

Spark-conf:

spark.executor.memory   10g

spark.storage.memoryFraction0.2

spark.python.worker.memory  1500mspark.akka.frameSize
 200spark.shuffle.memoryFraction0.1

spark.driver.memory 10g


Hadoop behavior observed:
Create 4 containers on four nodes including emr master but one emr slave on
idle (memory consumption around 2g and 0% cpu occupation)
Spark use one container for driver on emr slave node (make sense since I
required that much of memory)
Use the other three node for computing the tasks.


If yarn can't use all the nodes and I have to pay for the node, it's
just a big waste : p


Any thoughts on this?


Great thanks,

Ed



2015-05-18 12:07 GMT-04:00 Sandy Ryza sandy.r...@cloudera.com:

 *All

 On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe









Re: number of executors

2015-05-18 Thread xiaohe lan
Yeah, I read that page before, but it does not mention the options should
come before the application jar. Actually, if I put the --class option
before the application jar, I will get  ClassNotFoundException.

Anyway, thanks again Sandy.

On Tue, May 19, 2015 at 11:06 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Awesome!

 It's documented here:
 https://spark.apache.org/docs/latest/submitting-applications.html

 -Sandy

 On Mon, May 18, 2015 at 8:03 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi Sandy,

 Thanks for your information. Yes, spark-submit --master yarn
 --num-executors 5 --executor-cores 4
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is
 working awesomely. Is there any documentations pointing to this ?

 Thanks,
 Xiaohe

 On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 
 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a
 simple app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe










Re: number of executors

2015-05-18 Thread xiaohe lan
Hi Sandy,

Thanks for your information. Yes, spark-submit --master yarn
--num-executors 5 --executor-cores 4
target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is
working awesomely. Is there any documentations pointing to this ?

Thanks,
Xiaohe

On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe








Re: number of executors

2015-05-18 Thread Sandy Ryza
Awesome!

It's documented here:
https://spark.apache.org/docs/latest/submitting-applications.html

-Sandy

On Mon, May 18, 2015 at 8:03 PM, xiaohe lan zombiexco...@gmail.com wrote:

 Hi Sandy,

 Thanks for your information. Yes, spark-submit --master yarn
 --num-executors 5 --executor-cores 4
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is
 working awesomely. Is there any documentations pointing to this ?

 Thanks,
 Xiaohe

 On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Xiaohe,

 The all Spark options must go before the jar or they won't take effect.

 -Sandy

 On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Sorry, them both are assigned task actually.

 Aggregated Metrics by Executor
 Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput
 Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle
 Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 
 121007701630.4
 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6
 MB304.8 MB

 On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1
 MB / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems
 in my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Did you try --executor-cores param? While you submit the job, do a ps
 aux | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe









Re: number of executors

2015-05-17 Thread Akhil Das
Did you try --executor-cores param? While you submit the job, do a ps aux |
grep spark-submit and see the exact command parameters.

Thanks
Best Regards

On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app.

  spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar
 --class scala.SimpleApp --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see only
 two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe



Re: number of executors

2015-05-17 Thread xiaohe lan
Sorry, them both are assigned task actually.

Aggregated Metrics by Executor
Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size
/ RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill
(Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4
MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8
MB

On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote:

 bash-4.1$ ps aux | grep SparkSubmit
 xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
 /scratch/xilan/jdk1.8.0_45/bin/java -cp
 /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
 -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5 --executor-cores 4
 xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
 --color SparkSubmit


 When look at the sparkui, I see the following:
 Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
 TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB
 / 28089782host2:49970 ms00063.4 MB / 1810945

 So executor 2 is not even assigned a task ? Maybe I have some problems in
 my setting, but I don't know what could be the possible settings I set
 wrong or have not set.


 Thanks,
 Xiaohe

 On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try --executor-cores param? While you submit the job, do a ps aux
 | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
 app.

  spark-submit --master yarn
 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
 --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see
 only two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe






Re: number of executors

2015-05-17 Thread xiaohe lan
bash-4.1$ ps aux | grep SparkSubmit
xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
/scratch/xilan/jdk1.8.0_45/bin/java -cp
/scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
-Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
--num-executors 5 --executor-cores 4
xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
--color SparkSubmit


When look at the sparkui, I see the following:
Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB /
28089782host2:49970 ms00063.4 MB / 1810945

So executor 2 is not even assigned a task ? Maybe I have some problems in
my setting, but I don't know what could be the possible settings I set
wrong or have not set.


Thanks,
Xiaohe

On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try --executor-cores param? While you submit the job, do a ps aux
 | grep spark-submit and see the exact command parameters.

 Thanks
 Best Regards

 On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app.

  spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar
 --class scala.SimpleApp --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see only
 two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe





Re: number of executors

2015-05-16 Thread Ted Yu
What Spark release are you using ?

Can you check driver log to see if there is some clue there ?

Thanks

On Sat, May 16, 2015 at 12:01 AM, xiaohe lan zombiexco...@gmail.com wrote:

 Hi,

 I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app.

  spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar
 --class scala.SimpleApp --num-executors 5

 I have set the number of executor to 5, but from sparkui I could see only
 two executors and it ran very slow. What did I miss ?

 Thanks,
 Xiaohe



Re: Number of Executors per worker process

2015-03-02 Thread Spico Florin
Hello!
  Thank you very much for your response. In the book Learning Spark I
found out the following sentence:

Each application will have at most one executor on each worker

So worker can have one or none executor process spawned (perhaps the number
depends on the workload distribution).


Best regards,

 Florin

On Thu, Feb 26, 2015 at 1:04 PM, Jeffrey Jedele jeffrey.jed...@gmail.com
wrote:

 Hi Spico,

 Yes, I think an executor core in Spark is basically a thread in a worker
 pool. It's recommended to have one executor core per physical core on your
 machine for best performance, but I think in theory you can create as many
 threads as your OS allows.

 For deployment:
 There seems to be the actual worker JVM which coordinates the work on a
 worker node. I don't think the actual thread pool lives in there, but a
 separate JVM is created for each application that has cores allocated on
 the node. Otherwise it would be rather hard to impose memory limits on
 application level and it would have serious disadvantages regarding
 stability.

 You can check this behavior by looing at the processes on your machine:
 ps aux | grep spark.deploy = will show  master, worker (coordinator) and
 driver JVMs
 ps aux | grep spark.executor = will show the actual worker JVMs

 2015-02-25 14:23 GMT+01:00 Spico Florin spicoflo...@gmail.com:

 Hello!
  I've read the documentation about the spark architecture, I have the
 following questions:
 1: how many executors can be on a single worker process (JMV)?
 2:Should I think executor like a Java Thread Executor where the pool size
 is equal with the number of the given cores (set up by the
 SPARK_WORKER_CORES)?
 3. If the worker can have many executors, how this is handled by the
 Spark? Or can I handle by myself to set up the number of executors per JVM?

 I look forward for your answers.
   Regards,
   Florin





Re: Number of Executors per worker process

2015-02-26 Thread Jeffrey Jedele
Hi Spico,

Yes, I think an executor core in Spark is basically a thread in a worker
pool. It's recommended to have one executor core per physical core on your
machine for best performance, but I think in theory you can create as many
threads as your OS allows.

For deployment:
There seems to be the actual worker JVM which coordinates the work on a
worker node. I don't think the actual thread pool lives in there, but a
separate JVM is created for each application that has cores allocated on
the node. Otherwise it would be rather hard to impose memory limits on
application level and it would have serious disadvantages regarding
stability.

You can check this behavior by looing at the processes on your machine:
ps aux | grep spark.deploy = will show  master, worker (coordinator) and
driver JVMs
ps aux | grep spark.executor = will show the actual worker JVMs

2015-02-25 14:23 GMT+01:00 Spico Florin spicoflo...@gmail.com:

 Hello!
  I've read the documentation about the spark architecture, I have the
 following questions:
 1: how many executors can be on a single worker process (JMV)?
 2:Should I think executor like a Java Thread Executor where the pool size
 is equal with the number of the given cores (set up by the
 SPARK_WORKER_CORES)?
 3. If the worker can have many executors, how this is handled by the
 Spark? Or can I handle by myself to set up the number of executors per JVM?

 I look forward for your answers.
   Regards,
   Florin



Re: Number of executors and tasks

2014-11-26 Thread Akhil Das
1. On HDFS files are treated as ~64mb in block size. When you put the same
file in local file system (ext3/ext4) it will be treated as different (in
your case it looks like ~32mb) and that's why you are seeing 9 output files.

2. You could set *num-executors *to increase the number of executor
processes.

Thanks
Best Regards

On Wed, Nov 26, 2014 at 5:54 PM, Praveen Sripati praveensrip...@gmail.com
wrote:

 Hi,

 I am running Spark in the stand alone mode.

 1) I have a file of 286MB in HDFS (block size is 64MB) and so is split
 into 5 blocks. When I have the file in HDFS, 5 tasks are generated and so 5
 files in the output. My understanding is that there will be a separate
 partition for each block and there will be a separate task for each
 partition. This makes sense why I see 5 files in the output.

 When I put the same file in local file system (not HDFS), I see 9 files in
 the output. I am curious why it is 9?

 2) With the file in HDFS and local file system, I see a single
 CoarseGrainedExecutorBackend when I run the jps command. Why is it one
 executor process and how do we configure the number of executor process?

 Thanks,
 Praveen



Re: Number of executors and tasks

2014-11-26 Thread Akhil Das
This one would give you a better understanding
http://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors

Thanks
Best Regards

On Wed, Nov 26, 2014 at 10:32 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 1. On HDFS files are treated as ~64mb in block size. When you put the same
 file in local file system (ext3/ext4) it will be treated as different (in
 your case it looks like ~32mb) and that's why you are seeing 9 output files.

 2. You could set *num-executors *to increase the number of executor
 processes.

 Thanks
 Best Regards

 On Wed, Nov 26, 2014 at 5:54 PM, Praveen Sripati praveensrip...@gmail.com
  wrote:

 Hi,

 I am running Spark in the stand alone mode.

 1) I have a file of 286MB in HDFS (block size is 64MB) and so is split
 into 5 blocks. When I have the file in HDFS, 5 tasks are generated and so 5
 files in the output. My understanding is that there will be a separate
 partition for each block and there will be a separate task for each
 partition. This makes sense why I see 5 files in the output.

 When I put the same file in local file system (not HDFS), I see 9 files
 in the output. I am curious why it is 9?

 2) With the file in HDFS and local file system, I see a single
 CoarseGrainedExecutorBackend when I run the jps command. Why is it one
 executor process and how do we configure the number of executor process?

 Thanks,
 Praveen





Re: Number of executors change during job running

2014-07-16 Thread Bill Jay
Hi Tathagata,

I have tried the repartition method. The reduce stage first had 2 executors
and then it had around 85 executors. I specified repartition(300) and each
of the executors were specified 2 cores when I submitted the job. This
shows repartition works to increase more executors. However, the running
time was still around 50 seconds although I only did a simple groupby
operation. I think repartition may consume part of the running time.
Considering the input source of Kafka, is there a way to make the program
even faster? Thanks!


On Mon, Jul 14, 2014 at 3:22 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you give me a screen shot of the stages page in the web ui, the spark
 logs, and the code that is causing this behavior. This seems quite weird to
 me.

 TD


 On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 It seems repartition does not necessarily force Spark to distribute the
 data into different executors. I have launched a new job which uses
 repartition right after I received data from Kafka. For the first two
 batches, the reduce stage used more than 80 executors. Starting from the
 third batch, there were always only 2 executors in the reduce task
 (combineByKey). Even with the first batch which used more than 80
 executors, it took 2.4 mins to finish the reduce stage for a very small
 amount of data.

 Bill


 On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 After using repartition(300), how many executors did it run on? By the
 way, repartitions(300) means it will divide the shuffled data into 300
 partitions. Since there are many cores on each of the 300
 machines/executors, these partitions (each requiring a core) may not be
 spread all 300 executors. Hence, if you really want spread it all 300
 executors, you may have to bump up the partitions even more. However,
 increasing the partitions to too high may not be beneficial, and you will
 have play around with the number to figure out sweet spot that reduces the
 time to process the stage / time to process the whole batch.

 TD


 On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Do you mean that the data is not shuffled until the reduce stage? That
 means groupBy still only uses 2 machines?

 I think I used repartition(300) after I read the data from Kafka into
 DStream. It seems that it did not guarantee that the map or reduce stages
 will be run on 300 machines. I am currently trying to initiate 100 DStream
 from KafkaUtils.createDStream and union them. Now the reduce stages had
 around 80 machines for all the batches. However, this method will introduce
 many dstreams. It will be good if we can control the number of executors in
 the groupBy operation because the calculation needs to be finished within 1
 minute for different size of input data based on our production need.

 Thanks!


 Bill


 On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Aah, I get it now. That is because the input data streams is
 replicated on two machines, so by locality the data is processed on those
 two machines. So the map stage on the data uses 2 executors, but the
 reduce stage, (after groupByKey) the saveAsTextFiles would use 300 
 tasks.
 And the default parallelism takes into affect only when the data is
 explicitly shuffled around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing 
 work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job 
 submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
  wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = 

Re: Number of executors change during job running

2014-07-14 Thread Bill Jay
Hi Tathagata,

It seems repartition does not necessarily force Spark to distribute the
data into different executors. I have launched a new job which uses
repartition right after I received data from Kafka. For the first two
batches, the reduce stage used more than 80 executors. Starting from the
third batch, there were always only 2 executors in the reduce task
(combineByKey). Even with the first batch which used more than 80
executors, it took 2.4 mins to finish the reduce stage for a very small
amount of data.

Bill


On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 After using repartition(300), how many executors did it run on? By the
 way, repartitions(300) means it will divide the shuffled data into 300
 partitions. Since there are many cores on each of the 300
 machines/executors, these partitions (each requiring a core) may not be
 spread all 300 executors. Hence, if you really want spread it all 300
 executors, you may have to bump up the partitions even more. However,
 increasing the partitions to too high may not be beneficial, and you will
 have play around with the number to figure out sweet spot that reduces the
 time to process the stage / time to process the whole batch.

 TD


 On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Do you mean that the data is not shuffled until the reduce stage? That
 means groupBy still only uses 2 machines?

 I think I used repartition(300) after I read the data from Kafka into
 DStream. It seems that it did not guarantee that the map or reduce stages
 will be run on 300 machines. I am currently trying to initiate 100 DStream
 from KafkaUtils.createDStream and union them. Now the reduce stages had
 around 80 machines for all the batches. However, this method will introduce
 many dstreams. It will be good if we can control the number of executors in
 the groupBy operation because the calculation needs to be finished within 1
 minute for different size of input data based on our production need.

 Thanks!


 Bill


 On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Aah, I get it now. That is because the input data streams is replicated
 on two machines, so by locality the data is processed on those two
 machines. So the map stage on the data uses 2 executors, but the reduce
 stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
 default parallelism takes into affect only when the data is explicitly
 shuffled around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 -
 field._3) = timeDiff)

 val 

Re: Number of executors change during job running

2014-07-14 Thread Tathagata Das
Can you give me a screen shot of the stages page in the web ui, the spark
logs, and the code that is causing this behavior. This seems quite weird to
me.

TD


On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 It seems repartition does not necessarily force Spark to distribute the
 data into different executors. I have launched a new job which uses
 repartition right after I received data from Kafka. For the first two
 batches, the reduce stage used more than 80 executors. Starting from the
 third batch, there were always only 2 executors in the reduce task
 (combineByKey). Even with the first batch which used more than 80
 executors, it took 2.4 mins to finish the reduce stage for a very small
 amount of data.

 Bill


 On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 After using repartition(300), how many executors did it run on? By the
 way, repartitions(300) means it will divide the shuffled data into 300
 partitions. Since there are many cores on each of the 300
 machines/executors, these partitions (each requiring a core) may not be
 spread all 300 executors. Hence, if you really want spread it all 300
 executors, you may have to bump up the partitions even more. However,
 increasing the partitions to too high may not be beneficial, and you will
 have play around with the number to figure out sweet spot that reduces the
 time to process the stage / time to process the whole batch.

 TD


 On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Do you mean that the data is not shuffled until the reduce stage? That
 means groupBy still only uses 2 machines?

 I think I used repartition(300) after I read the data from Kafka into
 DStream. It seems that it did not guarantee that the map or reduce stages
 will be run on 300 machines. I am currently trying to initiate 100 DStream
 from KafkaUtils.createDStream and union them. Now the reduce stages had
 around 80 machines for all the batches. However, this method will introduce
 many dstreams. It will be good if we can control the number of executors in
 the groupBy operation because the calculation needs to be finished within 1
 minute for different size of input data based on our production need.

 Thanks!


 Bill


 On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Aah, I get it now. That is because the input data streams is replicated
 on two machines, so by locality the data is processed on those two
 machines. So the map stage on the data uses 2 executors, but the reduce
 stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
 default parallelism takes into affect only when the data is explicitly
 shuffled around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), 

Re: Number of executors change during job running

2014-07-11 Thread Praveen Seluka
If I understand correctly, you could not change the number of executors at
runtime right(correct me if am wrong) - its defined when we start the
application and fixed. Do you mean number of tasks?


On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there are
 sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill







Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Praveen,

I did not change the number of total executors. I specified 300 as the
number of executors when I submitted the jobs. However, for some stages,
the number of executors is very small, leading to long calculation time
even for small data set. That means not all executors were used for some
stages.

If I went to the detail of the running time of different executors, I found
some of them had very low running time while very few had very long running
time, leading to long overall running time. Another point I noticed is that
the number of completed tasks are usually larger than the number of total
tasks. That means sometimes the job is still running in some stages
although all the tasks have been finished. These are the too behavior I
observed that may related to the wrong running time.

Bill


On Thu, Jul 10, 2014 at 11:26 PM, Praveen Seluka psel...@qubole.com wrote:

 If I understand correctly, you could not change the number of executors at
 runtime right(correct me if am wrong) - its defined when we start the
 application and fixed. Do you mean number of tasks?


 On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I 
 checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there
 are sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill








Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

I also tried to use the number of partitions as parameters to the functions
such as groupByKey. It seems the numbers of executors is around 50 instead
of 300, which is the number of the executors I specified in submission
script. Moreover, the running time of different executors is skewed. The
ideal case is that Spark can distribute the data into 300 executors evenly
so that the computation can be efficiently finished. I am not sure how to
achieve this.

Thanks!

Bill


On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there are
 sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill







Re: Number of executors change during job running

2014-07-11 Thread Tathagata Das
Can you show us the program that you are running. If you are setting number
of partitions in the XYZ-ByKey operation as 300, then there should be 300
tasks for that stage, distributed on the 50 executors are allocated to your
context. However the data distribution may be skewed in which case, you can
use a repartition operation to redistributed the data more evenly (both
DStream and RDD have repartition).

TD


On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I 
 checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there
 are sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill








Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

Below is my main function. I omit some filtering and data conversion
functions. These functions are just a one-to-one mapping, which may not
possible increase running time. The only reduce function I have here is
groupByKey. There are 4 topics in my Kafka brokers and two of the topics
have 240k lines each minute. And the other two topics have less than 30k
lines per minute. The batch size is one minute and I specified 300
executors in my spark-submit script. The default parallelism is 300.


val parition = 300
val zkQuorum = zk1,zk2,zk3
val group = my-group- + currentTime.toString
val topics = topic1,topic2,topic3,topic4
val numThreads = 4
val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
ssc = new StreamingContext(conf, Seconds(batch))
ssc.checkpoint(hadoopOutput + checkpoint)
val lines = lines1
lines.cache()
val jsonData = lines.map(JSON.parseFull(_))
val mapData = jsonData.filter(_.isDefined)

.map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
val validMapData = mapData.filter(isValidData(_))
val fields = validMapData.map(data = (data(id).toString,
timestampToUTCUnix(data(time).toString),

 timestampToUTCUnix(data(local_time).toString), data(id2).toString,
   data(id3).toString,
data(log_type).toString, data(sub_log_type).toString))
val timeDiff = 3600L
val filteredFields = fields.filter(field = abs(field._2 - field._3) =
timeDiff)

val watchTimeFields = filteredFields.map(fields = (fields._1,
fields._2, fields._4, fields._5, fields._7))
val watchTimeTuples = watchTimeFields.map(fields =
getWatchtimeTuple(fields))
val programDuids = watchTimeTuples.map(fields = (fields._3,
fields._1)).groupByKey(partition)
val programDuidNum = programDuids.map{case(key, value) = (key,
value.toSet.size)}
programDuidNum.saveAsTextFiles(hadoopOutput+result)

I have been working on this for several days. No findings why there are
always 2 executors for the groupBy stage. Thanks a lot!

Bill


On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I 
 checked
 the slow stage 

Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi folks,

I just ran another job that only received data from Kafka, did some
filtering, and then save as text files in HDFS. There was no reducing work
involved. Surprisingly, the number of executors for the saveAsTextFiles
stage was also 2 although I specified 300 executors in the job submission.
As a result, the simple save file action took more than 2 minutes. Do you
have any idea how Spark determined the number of executors
for different stages?

Thanks!

Bill


On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 - field._3)
 = timeDiff)

 val watchTimeFields = filteredFields.map(fields = (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields = (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) = (key,
 value.toSet.size)}
 programDuidNum.saveAsTextFiles(hadoopOutput+result)

 I have been working on this for several days. No findings why there are
 always 2 executors for the groupBy stage. Thanks a lot!

 Bill


 On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I 
 further
 observed that most executors take less than 20 seconds but two of them 
 take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the
 DStream.ByKey operations? If the reduce by key is not set, 

Re: Number of executors change during job running

2014-07-11 Thread Tathagata Das
Aah, I get it now. That is because the input data streams is replicated on
two machines, so by locality the data is processed on those two machines.
So the map stage on the data uses 2 executors, but the reduce stage,
(after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
parallelism takes into affect only when the data is explicitly shuffled
around.

You can fix this by explicitly repartitioning the data.

inputDStream.repartition(partitions)

This is covered in the streaming tuning guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
.

TD



On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 - field._3)
 = timeDiff)

 val watchTimeFields = filteredFields.map(fields = (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields = (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) = (key,
 value.toSet.size)}
 programDuidNum.saveAsTextFiles(hadoopOutput+result)

 I have been working on this for several days. No findings why there are
 always 2 executors for the groupBy stage. Thanks a lot!

 Bill


 On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD



Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

Do you mean that the data is not shuffled until the reduce stage? That
means groupBy still only uses 2 machines?

I think I used repartition(300) after I read the data from Kafka into
DStream. It seems that it did not guarantee that the map or reduce stages
will be run on 300 machines. I am currently trying to initiate 100 DStream
from KafkaUtils.createDStream and union them. Now the reduce stages had
around 80 machines for all the batches. However, this method will introduce
many dstreams. It will be good if we can control the number of executors in
the groupBy operation because the calculation needs to be finished within 1
minute for different size of input data based on our production need.

Thanks!


Bill


On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Aah, I get it now. That is because the input data streams is replicated on
 two machines, so by locality the data is processed on those two machines.
 So the map stage on the data uses 2 executors, but the reduce stage,
 (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
 parallelism takes into affect only when the data is explicitly shuffled
 around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 - field._3)
 = timeDiff)

 val watchTimeFields = filteredFields.map(fields = (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields = (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) = (key,
 value.toSet.size)}
 programDuidNum.saveAsTextFiles(hadoopOutput+result)

 I have been working on this for several days. No findings why there are
 always 2 executors for the groupBy stage. Thanks a lot!

 Bill


 On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which 

Re: Number of executors change during job running

2014-07-10 Thread Tathagata Das
Are you specifying the number of reducers in all the DStream.ByKey
operations? If the reduce by key is not set, then the number of reducers
used in the stages can keep changing across batches.

TD


On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from Kafka
 and group the data by a certain field. The data size is 480k lines per
 minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to finish
 the groupBy operation, which seems slow to me. I allocated 300 workers and
 specify 300 as the partition number for groupby. When I checked the slow
 stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2
 executors allocated for this stage. However, during other batches, the
 executors can be several hundred for the same stage, which means the number
 of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for different
 stages and how to increase the efficiency for task? Thanks!

 Bill



Re: Number of executors change during job running

2014-07-10 Thread Bill Jay
Hi Tathagata,

I set default parallelism as 300 in my configuration file. Sometimes there
are more executors in a job. However, it is still slow. And I further
observed that most executors take less than 20 seconds but two of them take
much longer such as 2 minutes. The data size is very small (less than 480k
lines with only 4 fields). I am not sure why the group by operation takes
more then 3 minutes.  Thanks!

Bill


On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from Kafka
 and group the data by a certain field. The data size is 480k lines per
 minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to finish
 the groupBy operation, which seems slow to me. I allocated 300 workers and
 specify 300 as the partition number for groupby. When I checked the slow
 stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes
 2 executors allocated for this stage. However, during other batches, the
 executors can be several hundred for the same stage, which means the number
 of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for different
 stages and how to increase the efficiency for task? Thanks!

 Bill





Re: Number of executors change during job running

2014-07-10 Thread Tathagata Das
Can you try setting the number-of-partitions in all the shuffle-based
DStream operations, explicitly. It may be the case that the default
parallelism (that is, spark.default.parallelism) is probably not being
respected.

Regarding the unusual delay, I would look at the task details of that stage
in the Spark web ui. It will show break of time for each task, including GC
times, etc. That might give some indication.

TD


On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes there
 are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from Kafka
 and group the data by a certain field. The data size is 480k lines per
 minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there are
 sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill