Dear Community Request to help on below queries they are unanswered.
Thanks and Regards Aniruddh On Wed, Jul 15, 2015 at 12:37 PM, Aniruddh Sharma <asharma...@gmail.com> wrote: > Hi TD, > > Request your guidance on below 5 queries. Following is the context of them > that I would use to evaluate based on your response. > > a) I need to decide whether to deploy Spark in Standalone mode or in Yarn. > But it seems to me that Spark in Yarn is more parallel than Standalone mode > (given same number of Physical cores) as it is possible to increase > execution threads in Yarn by --executor-cores method > b) Also need to understand following which is not clearly understandable. > Other persons in mailing list are also raising this query in another words > for different cases while doing tuning of jobs. Theoretically a JVM can > support thousands of threads. But in context of Spark what is advisable > usage of ratio of physical cores to ratio of threads to be created to ratio > of partitions to be created. If you find this relevant and important then > might be there could be better link to explain this both in Yarn and Stand > Alone mode. > > Thanks and Regards > Aniruddh > > On Fri, Jul 10, 2015 at 11:45 AM, Aniruddh Sharma <asharma...@gmail.com> > wrote: > >> Hi TD, >> >> Thanks for elaboration. I have further doubts based on further test that >> I did after your guidance >> >> Case 1: Standalone Spark-- >> In standalone mode, as you explained,master in spark-submit local[*] >> implicitly, so it uses as creates threads as the number of cores that VM >> has, but User can control the number of partitions which needs to be >> created and in accordance with number of partitions, tasks will be created. >> >> Query 1: If I have 4 cores, then 4 threads will be created but if I give >> 40 partitions to my data, than 40 tasks will be created which needs to be >> executed on 4 threads. Does it work this way, that 4 threads execute 4 >> tasks (out of 40 in parallel) and when first set of task gets complete then >> they pick next 4 tasks and then they ask execute tasks in sequential >> manner. That is 4 tasks concurrent but rest of tasks in sequence when first >> concurrent set gets complete. >> >> Query 2: When we pass total-num-cores to Spark in StandAlone mode, then >> it seems number of threads do not increase. When I execute >> sc.defaultParallelism then it does not seem to take any effect on passed >> total-num-cores parameter. So when we use this parameter what does it >> exactly mean. Does it control number of threads or does it say to Spark >> Master to provide these many number of physical cores to this job. I mean >> is this parameter relevant not for a single job but if multiple jobs are >> running in cluster than to tell Spark Scheduler not to overallocate >> resources to a single job. Also setting this parameter, does it guarantee >> any behavior or is it only an indicator for Spark Scheduler. >> >> >> Case 2: Spark on Yarn >> In Spark on Yarn, it seems that threads which get created is not based on >> number of physical cores underlying. >> >> Query 3: But it seems to be (defaultMinPartition * executor-cores). Is >> this understanding correct. If yes then does it mean Developer has a >> control on number of threads to request to Spark by passing executor-core >> option (which was not there in Standalone mode as number of threads was >> based on number of physical cores). Is there a special reason for this kind >> of difference >> >> Query 4: Also it seems there is a restriction on value I can pass in >> executor-cores option which seems to be dependent on underlying physical >> cores. For example If I have 4 cores and I pass this value to be 20 then it >> works, but if I pass this value to be 100 then it does not work. So it >> seems actual number of threads which can be created inside JVM are still >> limited by number of physical cores but it can be controlled by >> executor-cores option. Kindly elaborate what is best practice to request >> how many threads based on physical cores and how physical cores limit this >> behavior. >> >> Query 5: Is there a reason for difference in behavior of total-num-cores >> (does not create a thread ) in Stand Alone mode and exectuor-cores( creates >> thread) in Yarn mode in how threads to be created. It seems in Yarn mode we >> can create more threads in same Executor JVM compated to Standalone mode >> for same number of physical cores. >> >> Thanks and Regards >> Aniruddh >> >> >> >> >> On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Query 1) What spark runs is tasks in task slots, whatever is the mapping >>> ot tasks to physical cores it does not matter. If there are two task slots >>> (2 threads in local mode, or an executor with 2 task slots in distributed >>> mode), it can only run two tasks concurrently. That is true even if the >>> task is really not doing much. There is no multiplexing going on between >>> tasks and task slots. So to answer your query 1, there is 1 thread that is >>> permanently allocated to the receiver task (a long running task) even if it >>> does not do much. There is no thread left to process the data that is being >>> received. >>> >>> Query 2) I think this is already explained above. The receiver task is >>> taking the only available slot, leaving nothing for the actual tasks to >>> execute. This will work fine as long as there is n+1 threads, where n = >>> number of receivers. >>> >>> Query 3) The 2nd thread will be running tasks that process the in-memory >>> blocks of data generated by the receiver running on the first thread. Now >>> if the operating system underneath has only one core (physical or virtual), >>> then those two thread will be multiplexing the resources of that core. >>> >>> >>> >>> On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma <asharma...@gmail.com> >>> wrote: >>> >>>> Thanks for revert.....I still have a confusion. Kindly find my >>>> understanding >>>> >>>> Following is the code >>>> >>>> ******************************************************************************** >>>> val ssc = new StreamingContext(sc, Seconds(1)) >>>> val lines = ssc.socketTextStream("localhost", 7777) >>>> lines.print() >>>> ssc.start() >>>> >>>> ******************************************************************************** >>>> >>>> Case 1: When I launch VM with only 1 core and start spark-shell without >>>> any parameter then as per above explanation it uses local[*] implicitly and >>>> it creates 1 thread as VM has 1 core. >>>> >>>> Query 1) But what does it try to execute in that 1 explicit thread ? >>>> Does Receiver does not get executed or does task does not get executed >>>> because Receiver is not heavy , i am entering only 1 line so shouldn't same >>>> physical core be shared with Receiver(internal thread) and thread running >>>> task ? >>>> For example-- My VM has 1 physical core and multiple daemons like >>>> master/worker etc are also working successfully with sharing 1 physical >>>> core only. Also what I understand is that Executor has a JVM in which >>>> Receiver is executing as a internal thread and 1 thread (for executing >>>> task) is created in same JVM but for some reason it does not get CPU. >>>> >>>> Query 2) Extending above mentioned analogy to another case, not in >>>> Spark Streaming, but normal Spark core. If I read input data with 3 >>>> partitions with 1 physical core and do some action on it then also 3 tasks >>>> should be created and each task should be handled in a separate thread >>>> inside executor JVM. It also works which means single physical core >>>> executes 3 different threads executing 3 tasks for 3 partitions. So why >>>> Streaming case does not get execute. >>>> >>>> Case 2: When I launch VM with only 1 core and start spark-shell with >>>> --master local[2] then as per above explanation it uses local[2] implicitly >>>> and it creates 2 thread but my VM has still 1 physical core >>>> >>>> Query 3) Now when 2 threads are created, but my input data has 1 >>>> partition, so still it requires only 1 task and Receiver is an internal >>>> thread in Executor JVM. What goes in extra in thread 2 in this case , which >>>> was not getting executed in above case with 1 thread only. And even if 2 >>>> threads are created , they are still to be executed by same physical core >>>> so kindly elaborate what is extra processing in extra thread in this case. >>>> >>>> Thanks and Regards >>>> Aniruddh >>>> >>>> On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> There are several levels of indirection going on here, let me clarify. >>>>> >>>>> In the local mode, Spark runs tasks (which includes receivers) using >>>>> the number of threads defined in the master (either local, or local[2], or >>>>> local[*]). >>>>> local or local[1] = single thread, so only one task at a time >>>>> local[2] = 2 threads, so two tasks >>>>> local[*] = as many threads as the number cores it can detect through >>>>> the operating system. >>>>> >>>>> >>>>> Test 1: When you dont specify master in spark-submit, it uses local[*] >>>>> implicitly, so it uses as many threads as the number of cores that VM has. >>>>> Between 1 and 2 VM cores, the behavior was as expected. >>>>> Test 2: When you specified master as local[2], it used two threads. >>>>> >>>>> HTH >>>>> >>>>> TD >>>>> >>>>> On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma <asharma...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> I am new to Spark. Following is the problem that I am facing >>>>>> >>>>>> Test 1) I ran a VM on CDH distribution with only 1 core allocated to >>>>>> it and I ran simple Streaming example in spark-shell with sending data on >>>>>> 7777 port and trying to read it. With 1 core allocated to this nothing >>>>>> happens in my streaming program and it does not receive data. Now I >>>>>> restart >>>>>> VM with 2 cores allocated to it and start spark-shell again and ran >>>>>> Streaming example again and this time it works >>>>>> >>>>>> Query a): From this test I concluded that Receiver in Streaming will >>>>>> occupy the core completely even though I am using very less data and it >>>>>> does not need complete core for same >>>>>> but it does not assign this core to Executor for calculating >>>>>> transformation. And doing comparison of Partition processing and >>>>>> Receiver >>>>>> processing is that in case of Partitions same >>>>>> physical cores can parallelly process multiple partitions but >>>>>> Receiver will not allow its core to process anything else. Is this >>>>>> understanding correct >>>>>> >>>>>> Test2) Now I restarted VM with 1 core again and start spark-shell >>>>>> --master local[2]. I have allocated only 1 core to VM but i say to >>>>>> spark-shell to use 2 cores. and I test streaming program again and it >>>>>> somehow works. >>>>>> >>>>>> Query b) Now I am more confused and I dont understand when I have >>>>>> only 1 core for VM. I thought previously it did not work because it had >>>>>> only 1 core and Receiver is completely blocking it and not sharing it >>>>>> with >>>>>> Executor. But when I do start with local[2] and still having only 1 core >>>>>> to >>>>>> VM it works. So it means that Receiver and Executor are both getting same >>>>>> physical CPU. Request you to explain how is it different in this case >>>>>> and what conclusions shall I draw in context of physical CPU usage. >>>>>> >>>>>> Thanks and Regards >>>>>> Aniruddh >>>>>> >>>>>> >>>>> >>>> >>> >> >