Hi TD,

Thanks for elaboration. I have  further doubts based on further test that I
did after your guidance

Case 1: Standalone Spark--
In standalone mode, as you explained,master in spark-submit local[*]
implicitly, so it uses as creates threads as the number of cores that VM
has, but User can control the number of partitions which needs to be
created and in accordance with number of partitions, tasks will be created.

Query 1: If I have 4 cores, then 4 threads will be created but if I give 40
partitions to my data, than 40 tasks will be created which needs to be
executed on 4 threads. Does it work this way, that 4 threads execute 4
tasks (out of 40 in parallel) and when first set of task gets complete then
they pick next 4 tasks and then they ask execute tasks in sequential
manner. That is 4 tasks concurrent but rest of tasks in sequence when first
concurrent set gets complete.

Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it
seems number of threads do not increase. When I execute
sc.defaultParallelism then it does not seem to take any effect on passed
total-num-cores parameter. So when we use this parameter what does it
exactly mean. Does it control number of threads or does it say to Spark
Master to provide these many number of physical cores to this job. I mean
is this parameter relevant not for a single job but if multiple jobs are
running in cluster than to tell Spark Scheduler not to overallocate
resources to a single job. Also setting this parameter, does it guarantee
any behavior or is it only an indicator for Spark Scheduler.


Case 2: Spark on Yarn
In Spark on Yarn, it seems that threads which get created is not based on
number of physical cores underlying.

Query 3: But it seems to be (defaultMinPartition * executor-cores). Is this
understanding correct. If yes then does it mean Developer has a control on
number of threads to request to Spark by passing executor-core option
(which was not there in Standalone mode as number of threads was based on
number of physical cores). Is there a special reason for this kind of
difference

Query 4: Also it seems there is a restriction on value I can pass in
executor-cores option which seems to be dependent on underlying physical
cores. For example If I have 4 cores and I pass this value to be 20 then it
works, but if I pass this value to be 100 then it does not work. So it
seems actual number of threads which can be created inside JVM are still
limited by number of physical cores but it can be controlled by
executor-cores option. Kindly elaborate what is best practice to request
how many threads based on physical cores and how physical cores limit this
behavior.

Query 5: Is there a reason for difference in behavior of total-num-cores
(does not create a thread ) in Stand Alone mode and exectuor-cores( creates
thread) in Yarn mode in how threads to be created. It seems in Yarn mode we
can create more threads in same Executor JVM compated to Standalone mode
for same number of physical cores.

Thanks and Regards
Aniruddh




On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das <t...@databricks.com> wrote:

> Query 1) What spark runs is tasks in task slots, whatever is the mapping
> ot tasks to physical cores it does not matter. If there are two task slots
> (2 threads in local mode, or an executor with 2 task slots in distributed
> mode), it can only run two tasks concurrently. That is true even if the
> task is really not doing much. There is no multiplexing going on between
> tasks and task slots. So to answer your query 1, there is 1 thread that is
> permanently allocated to the receiver task (a long running task) even if it
> does not do much. There is no thread left to process the data that is being
> received.
>
> Query 2) I think this is already explained above. The receiver task is
> taking the only available slot, leaving nothing for the actual tasks to
> execute. This will work fine as long as there is n+1 threads, where n =
> number of receivers.
>
> Query 3) The 2nd thread will be running tasks that process the in-memory
> blocks of data generated by the receiver running on the first thread. Now
> if the operating system underneath has only one core (physical or virtual),
> then those two thread will be multiplexing the resources of that core.
>
>
>
> On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma <asharma...@gmail.com>
> wrote:
>
>> Thanks for revert.....I still have a confusion. Kindly find my
>> understanding
>>
>> Following is the code
>>
>> ********************************************************************************
>> val ssc = new StreamingContext(sc, Seconds(1))
>> val lines = ssc.socketTextStream("localhost", 7777)
>> lines.print()
>> ssc.start()
>>
>> ********************************************************************************
>>
>> Case 1: When I launch VM with only 1 core and start spark-shell without
>> any parameter then as per above explanation it uses local[*] implicitly and
>> it creates 1 thread as VM has 1 core.
>>
>> Query 1) But what does it try to execute in that 1 explicit thread ? Does
>> Receiver does not get executed or does task does not get executed because
>> Receiver is not heavy , i am entering only 1 line so shouldn't same
>> physical core be shared with Receiver(internal thread) and thread running
>> task ?
>> For example-- My VM has 1 physical core and multiple daemons like
>> master/worker etc are also working successfully with sharing 1 physical
>> core only. Also what I understand is that Executor has a JVM in which
>> Receiver is executing as a internal thread and 1 thread (for executing
>> task) is created in same JVM but for some reason it does not get CPU.
>>
>> Query 2) Extending above mentioned analogy to another case, not in Spark
>> Streaming, but normal Spark core. If I read input data with 3 partitions
>> with 1 physical core and do some action on it then also 3 tasks should be
>> created and each task should be handled in a separate thread inside
>> executor JVM. It also works which means single physical core executes 3
>> different threads executing 3 tasks for 3 partitions. So why Streaming case
>> does not get execute.
>>
>> Case 2: When I launch VM with only 1 core and start spark-shell with
>> --master local[2] then as per above explanation it uses local[2] implicitly
>> and it creates 2 thread but my VM has still 1 physical core
>>
>> Query 3) Now when 2 threads are created, but my input data has 1
>> partition, so still it requires only 1 task and Receiver is an internal
>> thread in Executor JVM. What goes in extra in thread 2 in this case , which
>> was not getting executed in above case with 1 thread only. And even if 2
>> threads are created , they are still to be executed by same physical core
>> so kindly elaborate what is extra processing in extra thread in this case.
>>
>> Thanks and Regards
>> Aniruddh
>>
>> On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> There are several levels of indirection going on here, let me clarify.
>>>
>>> In the local mode, Spark runs tasks (which includes receivers) using the
>>> number of threads defined in the master (either local, or local[2], or
>>> local[*]).
>>> local or local[1] = single thread, so only one task at a time
>>> local[2] = 2 threads, so two tasks
>>> local[*] = as many threads as the number cores it can detect through the
>>> operating system.
>>>
>>>
>>> Test 1: When you dont specify master in spark-submit, it uses local[*]
>>> implicitly, so it uses as many threads as the number of cores that VM has.
>>> Between 1 and 2 VM cores, the behavior was as expected.
>>> Test 2: When you specified master as local[2], it used two threads.
>>>
>>> HTH
>>>
>>> TD
>>>
>>> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <asharma...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> I am new to Spark. Following is the problem that I am facing
>>>>
>>>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to it
>>>> and I ran simple Streaming example in spark-shell with sending data on 7777
>>>> port and trying to read it. With 1 core allocated to this nothing happens
>>>> in my streaming program and it does not receive data. Now I restart VM with
>>>> 2 cores allocated to it and start spark-shell again and ran Streaming
>>>> example again and this time it works
>>>>
>>>> Query a): From this test I concluded that Receiver in Streaming will
>>>> occupy the core completely even though I am using very less data and it
>>>> does not need complete core for same
>>>> but it does not assign this core to Executor for calculating
>>>> transformation.  And doing comparison of Partition processing and Receiver
>>>> processing is that in case of Partitions same
>>>> physical cores can parallelly process multiple partitions but Receiver
>>>> will not allow its core to process anything else. Is this understanding
>>>> correct
>>>>
>>>> Test2) Now I restarted VM with 1 core again and start spark-shell
>>>> --master local[2]. I have allocated only 1 core to VM but i say to
>>>> spark-shell to use 2 cores. and I test streaming program again and it
>>>> somehow works.
>>>>
>>>> Query b) Now I am more confused and I dont understand when I have only
>>>> 1 core for VM. I thought previously it did not work because it had only 1
>>>> core and Receiver is completely blocking it and not sharing it with
>>>> Executor. But when I do start with local[2] and still having only 1 core to
>>>> VM it works. So it means that Receiver and Executor are both getting same
>>>> physical CPU. Request you to explain how is it different in this case
>>>> and what conclusions shall I draw in context of physical CPU usage.
>>>>
>>>> Thanks and Regards
>>>> Aniruddh
>>>>
>>>>
>>>
>>
>

Reply via email to