Thank you. For now we plan to use spark-shell to submit jobs. Regards, Raghava.
On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote: > Glad to hear that the problem was solvable! I have not seen delays of this > type for later stages in jobs run by spark-submit, but I do not think it > impossible if your stage has no lineage dependence on other RDDs. > > I'm CC'ing the dev list to report of other users observing load imbalance > caused by unusual initial task scheduling. I don't know of ways to avoid > this other than creating a dummy task to synchronize the executors, but > hopefully someone from there can suggest other possibilities. > > Mike > On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com> > wrote: > >> Mike, >> >> It turns out the executor delay, as you mentioned, is the cause. After we >> introduced a dummy stage, partitioning was working fine. Does this delay >> happen during later stages as well? We noticed the same behavior >> (partitioning happens on spark-shell but not through spark-submit) at a >> later stage also. >> >> Apart from introducing a dummy stage or running it from spark-shell, is >> there any other option to fix this? >> >> Regards, >> Raghava. >> >> >> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote: >> >>> When submitting a job with spark-submit, I've observed delays (up to >>> 1--2 seconds) for the executors to respond to the driver in order to >>> receive tasks in the first stage. The delay does not persist once the >>> executors have been synchronized. >>> >>> When the tasks are very short, as may be your case (relatively small >>> data and a simple map task like you have described), the 8 tasks in >>> your stage may be allocated to only 1 executor in 2 waves of 4, since >>> the second executor won't have responded to the master before the >>> first 4 tasks on the first executor have completed. >>> >>> To see if this is the cause in your particular case, you could try the >>> following to confirm: >>> 1. Examine the starting times of the tasks alongside their >>> executor >>> 2. Make a "dummy" stage execute before your real stages to >>> synchronize the executors by creating and materializing any random RDD >>> 3. Make the tasks longer, i.e. with some silly computational >>> work. >>> >>> Mike >>> >>> >>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: >>> > Yes its the same data. >>> > >>> > 1) The number of partitions are the same (8, which is an argument to >>> the >>> > HashPartitioner). In the first case, these partitions are spread across >>> > both the worker nodes. In the second case, all the partitions are on >>> the >>> > same node. >>> > 2) What resources would be of interest here? Scala shell takes the >>> default >>> > parameters since we use "bin/spark-shell --master <master-URL>" to run >>> the >>> > scala-shell. For the scala program, we do set some configuration >>> options >>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo >>> > serializer. >>> > >>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB >>> > RAM.1 executor runs on each worker node. Following configuration >>> options >>> > are set for the scala program -- perhaps we should move it to the spark >>> > config file. >>> > >>> > Driver memory and executor memory are set to 12GB >>> > parallelism is set to 8 >>> > Kryo serializer is used >>> > Number of retainedJobs and retainedStages has been increased to check >>> them >>> > in the UI. >>> > >>> > What information regarding Spark Context would be of interest here? >>> > >>> > Regards, >>> > Raghava. >>> > >>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> >>> wrote: >>> > >>> >> If the data file is same then it should have similar distribution of >>> >> keys. >>> >> Few queries- >>> >> >>> >> 1. Did you compare the number of partitions in both the cases? >>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala >>> >> Program being submitted? >>> >> >>> >> Also, can you please share the details of Spark Context, Environment >>> and >>> >> Executors when you run via Scala program? >>> >> >>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >>> >> m.vijayaragh...@gmail.com> wrote: >>> >> >>> >>> Hello All, >>> >>> >>> >>> We are using HashPartitioner in the following way on a 3 node >>> cluster (1 >>> >>> master and 2 worker nodes). >>> >>> >>> >>> val u = >>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, >>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => >>> (y.toInt, >>> >>> x.toInt) } }).partitionBy(new >>> HashPartitioner(8)).setName("u").persist() >>> >>> >>> >>> u.count() >>> >>> >>> >>> If we run this from the spark shell, the data (52 MB) is split across >>> >>> the >>> >>> two worker nodes. But if we put this in a scala program and run it, >>> then >>> >>> all the data goes to only one node. We have run it multiple times, >>> but >>> >>> this >>> >>> behavior does not change. This seems strange. >>> >>> >>> >>> Is there some problem with the way we use HashPartitioner? >>> >>> >>> >>> Thanks in advance. >>> >>> >>> >>> Regards, >>> >>> Raghava. >>> >>> >>> >> >>> >> >>> > >>> > >>> > -- >>> > Regards, >>> > Raghava >>> > http://raghavam.github.io >>> > >>> >>> >>> -- >>> Thanks, >>> Mike >>> >> >> >> >> -- >> Regards, >> Raghava >> http://raghavam.github.io >> > -- Regards, Raghava http://raghavam.github.io