Fwd: Problem in Understanding concept of Physical Cores

2015-07-19 Thread Aniruddh Sharma
Hi

Apologies for posting queries again. But again posting as they are
unanswered and I am not able to determine differences of parallelism
between Stand Alone on Yarn and their dependency of physical cores. This I
need to understand so that I can decide in which mode we should deploy
Spark.

 If these queries are too naïve then request to provide documentation for
these queries. I will read in details.

Following is the context of queries

a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
But it seems to me that Spark in Yarn is more parallel than Standalone mode
(given same number of Physical cores) as it is possible to increase
execution threads in Yarn by --executor-cores method
b) Also need to understand following which is not clearly understandable.
Other persons in mailing list are also raising this query in another words
for different cases while doing tuning of jobs. Theoretically a JVM can
support thousands of threads. But in context of Spark what is advisable
usage of ratio of physical cores to ratio of threads to be created to ratio
of partitions to be created. If you find this relevant and important then
might be there could be better link to explain this both in Yarn and Stand
Alone mode.

and Following are queries

Case 1: Standalone Spark--
In standalone mode, as you explained,master in spark-submit local[*]
implicitly, so it uses as creates threads as the number of cores that VM
has, but User can control the number of partitions which needs to be
created and in accordance with number of partitions, tasks will be created.

Query 1: If I have 4 cores, then 4 threads will be created but if I give 40
partitions to my data, than 40 tasks will be created which needs to be
executed on 4 threads. Does it work this way, that 4 threads execute 4
tasks (out of 40 in parallel) and when first set of task gets complete then
they pick next 4 tasks and then they ask execute tasks in sequential
manner. That is 4 tasks concurrent but rest of tasks in sequence when first
concurrent set gets complete.

Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it
seems number of threads do not increase. When I execute
sc.defaultParallelism then it does not seem to take any effect on passed
total-num-cores parameter. So when we use this parameter what does it
exactly mean. Does it control number of threads or does it say to Spark
Master to provide these many number of physical cores to this job. I mean
is this parameter relevant not for a single job but if multiple jobs are
running in cluster than to tell Spark Scheduler not to overallocate
resources to a single job. Also setting this parameter, does it guarantee
any behavior or is it only an indicator for Spark Scheduler.


Case 2: Spark on Yarn
In Spark on Yarn, it seems that threads which get created is not based on
number of physical cores underlying.

Query 3: But it seems to be (defaultMinPartition * executor-cores). Is this
understanding correct. If yes then does it mean Developer has a control on
number of threads to request to Spark by passing executor-core option
(which was not there in Standalone mode as number of threads was based on
number of physical cores). Is there a special reason for this kind of
difference

Query 4: Also it seems there is a restriction on value I can pass in
executor-cores option which seems to be dependent on underlying physical
cores. For example If I have 4 cores and I pass this value to be 20 then it
works, but if I pass this value to be 100 then it does not work. So it
seems actual number of threads which can be created inside JVM are still
limited by number of physical cores but it can be controlled by
executor-cores option. Kindly elaborate what is best practice to request
how many threads based on physical cores and how physical cores limit this
behavior.

Query 5: Is there a reason for difference in behavior of total-num-cores
(does not create a thread ) in Stand Alone mode and exectuor-cores( creates
thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
can create more threads in same Executor JVM compated to Standalone mode
for same number of physical cores.

Thanks and Regards
Aniruddh
-- Forwarded message --
From: Aniruddh Sharma 
Date: Fri, Jul 17, 2015 at 6:05 PM
Subject: Re: Problem in Understanding concept of Physical Cores
To: user 


Dear Community

Request to help on below queries they are unanswered.

Thanks and Regards
Aniruddh

On Wed, Jul 15, 2015 at 12:37 PM, Aniruddh Sharma 
wrote:

> Hi TD,
>
> Request your guidance on below 5 queries. Following is the context of them
> that I would use to evaluate based on your response.
>
> a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
> But it seems to me that Spark in Yarn is more parallel than Standalone mode
> (given same number of Physical cores) as it is possible to increase
> execution threads in Yarn by --executor-cor

Re: Problem in Understanding concept of Physical Cores

2015-07-17 Thread Aniruddh Sharma
Dear Community

Request to help on below queries they are unanswered.

Thanks and Regards
Aniruddh

On Wed, Jul 15, 2015 at 12:37 PM, Aniruddh Sharma 
wrote:

> Hi TD,
>
> Request your guidance on below 5 queries. Following is the context of them
> that I would use to evaluate based on your response.
>
> a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
> But it seems to me that Spark in Yarn is more parallel than Standalone mode
> (given same number of Physical cores) as it is possible to increase
> execution threads in Yarn by --executor-cores method
> b) Also need to understand following which is not clearly understandable.
> Other persons in mailing list are also raising this query in another words
> for different cases while doing tuning of jobs. Theoretically a JVM can
> support thousands of threads. But in context of Spark what is advisable
> usage of ratio of physical cores to ratio of threads to be created to ratio
> of partitions to be created. If you find this relevant and important then
> might be there could be better link to explain this both in Yarn and Stand
> Alone mode.
>
> Thanks and Regards
> Aniruddh
>
> On Fri, Jul 10, 2015 at 11:45 AM, Aniruddh Sharma 
> wrote:
>
>> Hi TD,
>>
>> Thanks for elaboration. I have  further doubts based on further test that
>> I did after your guidance
>>
>> Case 1: Standalone Spark--
>> In standalone mode, as you explained,master in spark-submit local[*]
>> implicitly, so it uses as creates threads as the number of cores that VM
>> has, but User can control the number of partitions which needs to be
>> created and in accordance with number of partitions, tasks will be created.
>>
>> Query 1: If I have 4 cores, then 4 threads will be created but if I give
>> 40 partitions to my data, than 40 tasks will be created which needs to be
>> executed on 4 threads. Does it work this way, that 4 threads execute 4
>> tasks (out of 40 in parallel) and when first set of task gets complete then
>> they pick next 4 tasks and then they ask execute tasks in sequential
>> manner. That is 4 tasks concurrent but rest of tasks in sequence when first
>> concurrent set gets complete.
>>
>> Query 2: When we pass total-num-cores to Spark in StandAlone mode, then
>> it seems number of threads do not increase. When I execute
>> sc.defaultParallelism then it does not seem to take any effect on passed
>> total-num-cores parameter. So when we use this parameter what does it
>> exactly mean. Does it control number of threads or does it say to Spark
>> Master to provide these many number of physical cores to this job. I mean
>> is this parameter relevant not for a single job but if multiple jobs are
>> running in cluster than to tell Spark Scheduler not to overallocate
>> resources to a single job. Also setting this parameter, does it guarantee
>> any behavior or is it only an indicator for Spark Scheduler.
>>
>>
>> Case 2: Spark on Yarn
>> In Spark on Yarn, it seems that threads which get created is not based on
>> number of physical cores underlying.
>>
>> Query 3: But it seems to be (defaultMinPartition * executor-cores). Is
>> this understanding correct. If yes then does it mean Developer has a
>> control on number of threads to request to Spark by passing executor-core
>> option (which was not there in Standalone mode as number of threads was
>> based on number of physical cores). Is there a special reason for this kind
>> of difference
>>
>> Query 4: Also it seems there is a restriction on value I can pass in
>> executor-cores option which seems to be dependent on underlying physical
>> cores. For example If I have 4 cores and I pass this value to be 20 then it
>> works, but if I pass this value to be 100 then it does not work. So it
>> seems actual number of threads which can be created inside JVM are still
>> limited by number of physical cores but it can be controlled by
>> executor-cores option. Kindly elaborate what is best practice to request
>> how many threads based on physical cores and how physical cores limit this
>> behavior.
>>
>> Query 5: Is there a reason for difference in behavior of total-num-cores
>> (does not create a thread ) in Stand Alone mode and exectuor-cores( creates
>> thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
>> can create more threads in same Executor JVM compated to Standalone mode
>> for same number of physical cores.
>>
>> Thanks and Regards
>> Aniruddh
>>
>>
>>
>>
>> On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das 
>> wrote:

Re: Problem in Understanding concept of Physical Cores

2015-07-15 Thread Aniruddh Sharma
Hi TD,

Request your guidance on below 5 queries. Following is the context of them
that I would use to evaluate based on your response.

a) I need to decide whether to deploy Spark in Standalone mode or in Yarn.
But it seems to me that Spark in Yarn is more parallel than Standalone mode
(given same number of Physical cores) as it is possible to increase
execution threads in Yarn by --executor-cores method
b) Also need to understand following which is not clearly understandable.
Other persons in mailing list are also raising this query in another words
for different cases while doing tuning of jobs. Theoretically a JVM can
support thousands of threads. But in context of Spark what is advisable
usage of ratio of physical cores to ratio of threads to be created to ratio
of partitions to be created. If you find this relevant and important then
might be there could be better link to explain this both in Yarn and Stand
Alone mode.

Thanks and Regards
Aniruddh

On Fri, Jul 10, 2015 at 11:45 AM, Aniruddh Sharma 
wrote:

> Hi TD,
>
> Thanks for elaboration. I have  further doubts based on further test that
> I did after your guidance
>
> Case 1: Standalone Spark--
> In standalone mode, as you explained,master in spark-submit local[*]
> implicitly, so it uses as creates threads as the number of cores that VM
> has, but User can control the number of partitions which needs to be
> created and in accordance with number of partitions, tasks will be created.
>
> Query 1: If I have 4 cores, then 4 threads will be created but if I give
> 40 partitions to my data, than 40 tasks will be created which needs to be
> executed on 4 threads. Does it work this way, that 4 threads execute 4
> tasks (out of 40 in parallel) and when first set of task gets complete then
> they pick next 4 tasks and then they ask execute tasks in sequential
> manner. That is 4 tasks concurrent but rest of tasks in sequence when first
> concurrent set gets complete.
>
> Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it
> seems number of threads do not increase. When I execute
> sc.defaultParallelism then it does not seem to take any effect on passed
> total-num-cores parameter. So when we use this parameter what does it
> exactly mean. Does it control number of threads or does it say to Spark
> Master to provide these many number of physical cores to this job. I mean
> is this parameter relevant not for a single job but if multiple jobs are
> running in cluster than to tell Spark Scheduler not to overallocate
> resources to a single job. Also setting this parameter, does it guarantee
> any behavior or is it only an indicator for Spark Scheduler.
>
>
> Case 2: Spark on Yarn
> In Spark on Yarn, it seems that threads which get created is not based on
> number of physical cores underlying.
>
> Query 3: But it seems to be (defaultMinPartition * executor-cores). Is
> this understanding correct. If yes then does it mean Developer has a
> control on number of threads to request to Spark by passing executor-core
> option (which was not there in Standalone mode as number of threads was
> based on number of physical cores). Is there a special reason for this kind
> of difference
>
> Query 4: Also it seems there is a restriction on value I can pass in
> executor-cores option which seems to be dependent on underlying physical
> cores. For example If I have 4 cores and I pass this value to be 20 then it
> works, but if I pass this value to be 100 then it does not work. So it
> seems actual number of threads which can be created inside JVM are still
> limited by number of physical cores but it can be controlled by
> executor-cores option. Kindly elaborate what is best practice to request
> how many threads based on physical cores and how physical cores limit this
> behavior.
>
> Query 5: Is there a reason for difference in behavior of total-num-cores
> (does not create a thread ) in Stand Alone mode and exectuor-cores( creates
> thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
> can create more threads in same Executor JVM compated to Standalone mode
> for same number of physical cores.
>
> Thanks and Regards
> Aniruddh
>
>
>
>
> On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das  wrote:
>
>> Query 1) What spark runs is tasks in task slots, whatever is the mapping
>> ot tasks to physical cores it does not matter. If there are two task slots
>> (2 threads in local mode, or an executor with 2 task slots in distributed
>> mode), it can only run two tasks concurrently. That is true even if the
>> task is really not doing much. There is no multiplexing going on between
>> tasks and task slots. So to answer your query 1, there is 1 thread that is
>> permanently allocated to the receiver 

Re: How to speed up Spark process

2015-07-13 Thread Aniruddh Sharma
Hi Deepak

Not 100% sure , but please try increasing (--executor-cores ) to twice the
number of your physical cores on your machine.

Thanks and Regards
Aniruddh

On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> Its been 30 minutes and still the partitioner has not completed yet, its
> ever.
>
> Without repartition, i see this error
> https://issues.apache.org/jira/browse/SPARK-5928
>
>
>  FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, 
> mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 
> 2147483647: 3021252889 - discarded
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>   at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>
>
>
>
> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
> wrote:
>
>> I have 100 MB of Avro data. and i do repartition(307) is taking forever.
>>
>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>> 3. val quantiles = x.map( {k1,k2,k3,k4},  TDigest(inputRecord).asBytes
>> ).reduceByKey() [ This was groupBy earlier ]
>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>
>>
>> Attached is full Scala code.
>>
>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>> data of just just 100 MB.  (Hadoop takes 2.5 hours on 1 TB dataset)
>>
>>
>> ./bin/spark-submit -v --master yarn-cluster  --jars
>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>  --num-executors 330 --driver-memory 14g --driver-java-options
>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>> startDate=2015-06-20 endDate=2015-06-21
>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>> maxbuffersize=1068 maxResultSize=200G
>>
>>
>> I see this in stdout of the task on that executor
>>
>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads 
>> feature cannot be used because libhadoop cannot be loaded.
>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling 
>> in-memory map of 2.2 GB to disk (1 time so far)
>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling 
>> in-memory map of 2.2 GB to disk (2 times so far)
>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling 
>> in-memory map of 2.2 GB to disk (3 times so far)
>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling 
>> in-memory map of 2.2 GB to disk (4 times so far)
>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling 
>> in-memory map of 2.2 GB to disk (5 times so far)
>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling 
>> in-memory map of 2.2 GB to disk (6 times so far)
>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling 
>> in-memory map of 2.2 GB to disk (7 times so far)
>>
>>
>>
>> Also attached is the thread dump
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>


Re: spark streaming doubt

2015-07-13 Thread Aniruddh Sharma
Hi Sushant/Cody,

For question 1 , following is my understanding ( I am not 100% sure and
this is only my understanding, I have asked this question in another words
to TD for confirmation which is not confirmed as of now).

Following is my understanding. In accordance with tasks created in
proportion to partitions of data the main goal is to try to parallelize
execution of tasks which is dependent on number of simultaneous threads
created in executor JVM) and number of threads to be created is controlled
by User of program and User has to set this number in accordance with
number of physical cores. In Yarn number of threads should be
numDefaultPartitions*executor-cores  (which is user supplied). For example
if you have 3 physical cores in your machine then might be you can create
alteast 6 threads by passing executor-cores = 6 assuming your
numDefaultPartitions is set to 1. Then what it should do is each executor
should execute concurrent 6 tasks (rather than 3 which was number of
physical cores) and when these 6 tasks finish then it should execute
another 6 and so on. (Please note: Again this is my understanding how Spark
works and I may be wrong).

Thanks and Regards
Aniruddh

On Mon, Jul 13, 2015 at 7:22 PM, Cody Koeninger  wrote:

> Regarding your first question, having more partitions than you do
> executors usually means you'll have better utilization, because the
> workload will be distributed more evenly.  There's some degree of per-task
> overhead, but as long as you don't have a huge imbalance between number of
> tasks and number of executors that shouldn't be a large problem.
>
> I don't really understand your second question.
>
> On Sat, Jul 11, 2015 at 5:00 AM, Shushant Arora  > wrote:
>
>> 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
>> partitions in topic. Say I have 300 partitions in topic and 10 executors
>> and each with 3 cores so , is it means at a time only 10*3=30 partitions
>> are processed and then 30 like that since executors launch tasks per RDD
>> partitions , so I need in total; 300 tasks but since I have 30 cores(10
>> executors each with 3 cores) so these tasks will execute 30 after 30 till
>> 300.
>>
>> So reducing no of kafka paartitions to say 100 will speed up the
>> processing?
>>
>> 2.In spark streaming job when I processed the kafka stream using
>> foreachRDD
>>
>> directKafkaStream.foreachRDD(new function( public void call(  vi)){
>> v1.foreachPartition(new function(){public void call(){
>> //..process partition
>> }})
>>
>> });
>>
>> since foreachRDD is operation so it spawns spark job but these jobs are
>> not coming on driver console like in map and print function as
>>
>> 1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
>> partitions in topic. Say I have 300 partitions in topic and 10 executors
>> and each with 3 cores so , is it means at a time only 10*3=30 partitions
>> are processed and then 30 like that since executors launch tasks per RDD
>> partitions , so I need in total; 300 tasks but since I have 30 cores(10
>> executors each with 3 cores) so these tasks will execute 30 after 30 till
>> 300.
>>
>> So reducing no of kafka paartitions to say 100 will speed up the
>> processing?
>>
>> 2.In spark streaming job when I processed the kafka stream using
>> foreachRDD
>>
>> directKafkaStream.foreachRDD(new function( public void call(  vi)){
>> v1.foreachPartition(new function(){public void call(){
>> //..process partition
>> }})
>>
>> });
>>
>> since foreachRDD is operation so it spawns spark job but these jobs
>> timings are not coming on driver console like in map and print function as
>>
>>
>> ---
>> Time: 142905487 ms
>> ---
>> --
>> Time: 1429054871000 ms
>> ---
>>
>> ..
>>
>> Why is it so?
>>
>>
>> Thanks
>> Shushant
>>
>>
>>
>>
>>
>>
>


Re: Problem in Understanding concept of Physical Cores

2015-07-09 Thread Aniruddh Sharma
Hi TD,

Thanks for elaboration. I have  further doubts based on further test that I
did after your guidance

Case 1: Standalone Spark--
In standalone mode, as you explained,master in spark-submit local[*]
implicitly, so it uses as creates threads as the number of cores that VM
has, but User can control the number of partitions which needs to be
created and in accordance with number of partitions, tasks will be created.

Query 1: If I have 4 cores, then 4 threads will be created but if I give 40
partitions to my data, than 40 tasks will be created which needs to be
executed on 4 threads. Does it work this way, that 4 threads execute 4
tasks (out of 40 in parallel) and when first set of task gets complete then
they pick next 4 tasks and then they ask execute tasks in sequential
manner. That is 4 tasks concurrent but rest of tasks in sequence when first
concurrent set gets complete.

Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it
seems number of threads do not increase. When I execute
sc.defaultParallelism then it does not seem to take any effect on passed
total-num-cores parameter. So when we use this parameter what does it
exactly mean. Does it control number of threads or does it say to Spark
Master to provide these many number of physical cores to this job. I mean
is this parameter relevant not for a single job but if multiple jobs are
running in cluster than to tell Spark Scheduler not to overallocate
resources to a single job. Also setting this parameter, does it guarantee
any behavior or is it only an indicator for Spark Scheduler.


Case 2: Spark on Yarn
In Spark on Yarn, it seems that threads which get created is not based on
number of physical cores underlying.

Query 3: But it seems to be (defaultMinPartition * executor-cores). Is this
understanding correct. If yes then does it mean Developer has a control on
number of threads to request to Spark by passing executor-core option
(which was not there in Standalone mode as number of threads was based on
number of physical cores). Is there a special reason for this kind of
difference

Query 4: Also it seems there is a restriction on value I can pass in
executor-cores option which seems to be dependent on underlying physical
cores. For example If I have 4 cores and I pass this value to be 20 then it
works, but if I pass this value to be 100 then it does not work. So it
seems actual number of threads which can be created inside JVM are still
limited by number of physical cores but it can be controlled by
executor-cores option. Kindly elaborate what is best practice to request
how many threads based on physical cores and how physical cores limit this
behavior.

Query 5: Is there a reason for difference in behavior of total-num-cores
(does not create a thread ) in Stand Alone mode and exectuor-cores( creates
thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
can create more threads in same Executor JVM compated to Standalone mode
for same number of physical cores.

Thanks and Regards
Aniruddh




On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das  wrote:

> Query 1) What spark runs is tasks in task slots, whatever is the mapping
> ot tasks to physical cores it does not matter. If there are two task slots
> (2 threads in local mode, or an executor with 2 task slots in distributed
> mode), it can only run two tasks concurrently. That is true even if the
> task is really not doing much. There is no multiplexing going on between
> tasks and task slots. So to answer your query 1, there is 1 thread that is
> permanently allocated to the receiver task (a long running task) even if it
> does not do much. There is no thread left to process the data that is being
> received.
>
> Query 2) I think this is already explained above. The receiver task is
> taking the only available slot, leaving nothing for the actual tasks to
> execute. This will work fine as long as there is n+1 threads, where n =
> number of receivers.
>
> Query 3) The 2nd thread will be running tasks that process the in-memory
> blocks of data generated by the receiver running on the first thread. Now
> if the operating system underneath has only one core (physical or virtual),
> then those two thread will be multiplexing the resources of that core.
>
>
>
> On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma 
> wrote:
>
>> Thanks for revert.I still have a confusion. Kindly find my
>> understanding
>>
>> Following is the code
>>
>> 
>> val ssc = new StreamingContext(sc, Seconds(1))
>> val lines = ssc.socketTextStream("localhost", )
>> lines.print()
>> ssc.start()
>>
>> 
>>
>> Case 1: When I launch VM with only 1 core and start

Number of Threads in Executor to process Tasks

2015-07-09 Thread Aniruddh Sharma
Hi

I am new to Spark. I am confused between correlation in threads and
physical cores.

As per my understanding, according to number of partitions in data set,
number of tasks is created. For example I have a machine which has 10
physical cores and I have data set which has 100 partitions then in
Executor JVM 100 tasks (one per each partitioner will be created)

Query 1) But how will it be decided how many threads in Executor are
created to execute these 100 tasks and who creates these threads.

Query 2) Does parameter "total-executor-cores" define how many threads will
be launched in executor JVM to process tasks. If not than what is meaning
of "total-executor-cores" in context of both threads inside Executor JVM
and physical cores.

Thanks and Regards
Aniruddh


Re: Problem in Understanding concept of Physical Cores

2015-07-09 Thread Aniruddh Sharma
Thanks for revert.I still have a confusion. Kindly find my understanding

Following is the code

val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
lines.print()
ssc.start()


Case 1: When I launch VM with only 1 core and start spark-shell without any
parameter then as per above explanation it uses local[*] implicitly and it
creates 1 thread as VM has 1 core.

Query 1) But what does it try to execute in that 1 explicit thread ? Does
Receiver does not get executed or does task does not get executed because
Receiver is not heavy , i am entering only 1 line so shouldn't same
physical core be shared with Receiver(internal thread) and thread running
task ?
For example-- My VM has 1 physical core and multiple daemons like
master/worker etc are also working successfully with sharing 1 physical
core only. Also what I understand is that Executor has a JVM in which
Receiver is executing as a internal thread and 1 thread (for executing
task) is created in same JVM but for some reason it does not get CPU.

Query 2) Extending above mentioned analogy to another case, not in Spark
Streaming, but normal Spark core. If I read input data with 3 partitions
with 1 physical core and do some action on it then also 3 tasks should be
created and each task should be handled in a separate thread inside
executor JVM. It also works which means single physical core executes 3
different threads executing 3 tasks for 3 partitions. So why Streaming case
does not get execute.

Case 2: When I launch VM with only 1 core and start spark-shell with
--master local[2] then as per above explanation it uses local[2] implicitly
and it creates 2 thread but my VM has still 1 physical core

Query 3) Now when 2 threads are created, but my input data has 1 partition,
so still it requires only 1 task and Receiver is an internal thread in
Executor JVM. What goes in extra in thread 2 in this case , which was not
getting executed in above case with 1 thread only. And even if 2 threads
are created , they are still to be executed by same physical core so kindly
elaborate what is extra processing in extra thread in this case.

Thanks and Regards
Aniruddh

On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das  wrote:

> There are several levels of indirection going on here, let me clarify.
>
> In the local mode, Spark runs tasks (which includes receivers) using the
> number of threads defined in the master (either local, or local[2], or
> local[*]).
> local or local[1] = single thread, so only one task at a time
> local[2] = 2 threads, so two tasks
> local[*] = as many threads as the number cores it can detect through the
> operating system.
>
>
> Test 1: When you dont specify master in spark-submit, it uses local[*]
> implicitly, so it uses as many threads as the number of cores that VM has.
> Between 1 and 2 VM cores, the behavior was as expected.
> Test 2: When you specified master as local[2], it used two threads.
>
> HTH
>
> TD
>
> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma 
> wrote:
>
>> Hi
>>
>> I am new to Spark. Following is the problem that I am facing
>>
>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to it
>> and I ran simple Streaming example in spark-shell with sending data on 
>> port and trying to read it. With 1 core allocated to this nothing happens
>> in my streaming program and it does not receive data. Now I restart VM with
>> 2 cores allocated to it and start spark-shell again and ran Streaming
>> example again and this time it works
>>
>> Query a): From this test I concluded that Receiver in Streaming will
>> occupy the core completely even though I am using very less data and it
>> does not need complete core for same
>> but it does not assign this core to Executor for calculating
>> transformation.  And doing comparison of Partition processing and Receiver
>> processing is that in case of Partitions same
>> physical cores can parallelly process multiple partitions but Receiver
>> will not allow its core to process anything else. Is this understanding
>> correct
>>
>> Test2) Now I restarted VM with 1 core again and start spark-shell
>> --master local[2]. I have allocated only 1 core to VM but i say to
>> spark-shell to use 2 cores. and I test streaming program again and it
>> somehow works.
>>
>> Query b) Now I am more confused and I dont understand when I have only 1
>> core for VM. I thought previously it did not work because it had only 1
>> core and Receiver is completely blocking it and not sharing it with
>> Executor. But when I do start w

Out of Memory Errors on less number of cores in proportion to Partitions in Data

2015-07-08 Thread Aniruddh Sharma
Hi,

I am new to Spark. I have done following tests and I am confused in
conclusions. I have 2 queries.

Following is the detail of test

Test 1) Used 11 Node Cluster where each machine has 64 GB RAM and 4
physical cores. I ran a ALS algorithm using MilLib on 1.6 GB data set. I
ran 10 executors and my Rating data set has 20 partitions. It works. In
order to increase parallelism, I did 100 partitions instead of 20 and now
program does not work and it throws out of memory error.

Query a): As I had 4 cores on each machine , but my number of partitions
are 10 in each executor and my cores are not sufficient for partitions. Is
it supposed to give memory errors when this kind of misconfiguration.If
there are not sufficient cores and processing cannot be done in parallel,
can different partitions not be processed sequentially and operation could
have become slow rather than throwing memory error.

Query b)  If it gives error, then error message is not meaningful Here my
DAG was very simple and I could trace that lowering number of partitions is
working, but if on misconfiguration of cores it throws error, then how to
debug it in complex DAGs as error does not tell explicitly that problem
could be due to low number of cores. If my understanding is incorrect, then
kindly explain the reasons of error in this case

Thanks and Regards
Aniruddh


Problem in Understanding concept of Physical Cores

2015-07-08 Thread Aniruddh Sharma
Hi

I am new to Spark. Following is the problem that I am facing

Test 1) I ran a VM on CDH distribution with only 1 core allocated to it and
I ran simple Streaming example in spark-shell with sending data on 
port and trying to read it. With 1 core allocated to this nothing happens
in my streaming program and it does not receive data. Now I restart VM with
2 cores allocated to it and start spark-shell again and ran Streaming
example again and this time it works

Query a): From this test I concluded that Receiver in Streaming will occupy
the core completely even though I am using very less data and it does not
need complete core for same
but it does not assign this core to Executor for calculating
transformation.  And doing comparison of Partition processing and Receiver
processing is that in case of Partitions same
physical cores can parallelly process multiple partitions but Receiver will
not allow its core to process anything else. Is this understanding correct

Test2) Now I restarted VM with 1 core again and start spark-shell --master
local[2]. I have allocated only 1 core to VM but i say to spark-shell to
use 2 cores. and I test streaming program again and it somehow works.

Query b) Now I am more confused and I dont understand when I have only 1
core for VM. I thought previously it did not work because it had only 1
core and Receiver is completely blocking it and not sharing it with
Executor. But when I do start with local[2] and still having only 1 core to
VM it works. So it means that Receiver and Executor are both getting same
physical CPU. Request you to explain how is it different in this case and
what conclusions shall I draw in context of physical CPU usage.

Thanks and Regards
Aniruddh