Could you change numPartitions to {16, 32, 64} and run your program for each to see how many partitions are allocated to each worker? Let's see if you experience an all-nothing imbalance that way; if so, my guess is that something else is odd in your program logic or spark runtime environment, but if not and your executors all receive at least *some* partitions, then I still wouldn't rule out effects of scheduling delay. It's a simple test, but it could give some insight.
Mike his could still be a scheduling If only one has *all* partitions, and email me the log file? (If it's 10+ MB, just the first few thousand lines are fine). On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com> wrote: > Mike, All, > > It turns out that the second time we encountered the uneven-partition > issue is not due to spark-submit. It was resolved with the change in > placement of count(). > > Case-1: > > val numPartitions = 8 > // read uAxioms from HDFS, use hash partitioner on it and persist it > // read type1Axioms from HDFS, use hash partitioner on it and persist it > currDeltaURule1 = type1Axioms.join(uAxioms) > .values > .distinct(numPartitions) > .partitionBy(hashPartitioner) > currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) > > .persist(StorageLevel.MEMORY_AND_DISK) > .count() > > <more transformations and actions> > > currDeltaURule1 RDD results in all the data on one node (there are 2 > worker nodes). If we move count(), the uneven partition issue is resolved. > > Case-2: > > currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) > > .persist(StorageLevel.MEMORY_AND_DISK) > <count not run here> > > <some more transformations that don't affect currDeltaURule1 rdd> > > <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets executed. > This resolved the uneven partitioning issue. > > I don't see why the moving of an action to a later part in the code would > affect the partitioning. Are there other factors at play here that affect > the partitioning? > > (Inconsistent) uneven partitioning leads to one machine getting over > burdened (memory and number of tasks). We see a clear improvement in > runtime when the partitioning is even (happens when count is moved). > > Any pointers in figuring out this issue is much appreciated. > > Regards, > Raghava. > > > > > On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Glad to hear that the problem was solvable! I have not seen delays of >> this type for later stages in jobs run by spark-submit, but I do not think >> it impossible if your stage has no lineage dependence on other RDDs. >> >> I'm CC'ing the dev list to report of other users observing load imbalance >> caused by unusual initial task scheduling. I don't know of ways to avoid >> this other than creating a dummy task to synchronize the executors, but >> hopefully someone from there can suggest other possibilities. >> >> Mike >> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com> >> wrote: >> >>> Mike, >>> >>> It turns out the executor delay, as you mentioned, is the cause. After >>> we introduced a dummy stage, partitioning was working fine. Does this delay >>> happen during later stages as well? We noticed the same behavior >>> (partitioning happens on spark-shell but not through spark-submit) at a >>> later stage also. >>> >>> Apart from introducing a dummy stage or running it from spark-shell, is >>> there any other option to fix this? >>> >>> Regards, >>> Raghava. >>> >>> >>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote: >>> >>>> When submitting a job with spark-submit, I've observed delays (up to >>>> 1--2 seconds) for the executors to respond to the driver in order to >>>> receive tasks in the first stage. The delay does not persist once the >>>> executors have been synchronized. >>>> >>>> When the tasks are very short, as may be your case (relatively small >>>> data and a simple map task like you have described), the 8 tasks in >>>> your stage may be allocated to only 1 executor in 2 waves of 4, since >>>> the second executor won't have responded to the master before the >>>> first 4 tasks on the first executor have completed. >>>> >>>> To see if this is the cause in your particular case, you could try the >>>> following to confirm: >>>> 1. Examine the starting times of the tasks alongside their >>>> executor >>>> 2. Make a "dummy" stage execute before your real stages to >>>> synchronize the executors by creating and materializing any random RDD >>>> 3. Make the tasks longer, i.e. with some silly computational >>>> work. >>>> >>>> Mike >>>> >>>> >>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: >>>> > Yes its the same data. >>>> > >>>> > 1) The number of partitions are the same (8, which is an argument to >>>> the >>>> > HashPartitioner). In the first case, these partitions are spread >>>> across >>>> > both the worker nodes. In the second case, all the partitions are on >>>> the >>>> > same node. >>>> > 2) What resources would be of interest here? Scala shell takes the >>>> default >>>> > parameters since we use "bin/spark-shell --master <master-URL>" to >>>> run the >>>> > scala-shell. For the scala program, we do set some configuration >>>> options >>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo >>>> > serializer. >>>> > >>>> > We are running this on Azure D3-v2 machines which have 4 cores and >>>> 14GB >>>> > RAM.1 executor runs on each worker node. Following configuration >>>> options >>>> > are set for the scala program -- perhaps we should move it to the >>>> spark >>>> > config file. >>>> > >>>> > Driver memory and executor memory are set to 12GB >>>> > parallelism is set to 8 >>>> > Kryo serializer is used >>>> > Number of retainedJobs and retainedStages has been increased to check >>>> them >>>> > in the UI. >>>> > >>>> > What information regarding Spark Context would be of interest here? >>>> > >>>> > Regards, >>>> > Raghava. >>>> > >>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> >>>> wrote: >>>> > >>>> >> If the data file is same then it should have similar distribution of >>>> >> keys. >>>> >> Few queries- >>>> >> >>>> >> 1. Did you compare the number of partitions in both the cases? >>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala >>>> >> Program being submitted? >>>> >> >>>> >> Also, can you please share the details of Spark Context, Environment >>>> and >>>> >> Executors when you run via Scala program? >>>> >> >>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >>>> >> m.vijayaragh...@gmail.com> wrote: >>>> >> >>>> >>> Hello All, >>>> >>> >>>> >>> We are using HashPartitioner in the following way on a 3 node >>>> cluster (1 >>>> >>> master and 2 worker nodes). >>>> >>> >>>> >>> val u = >>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, >>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => >>>> (y.toInt, >>>> >>> x.toInt) } }).partitionBy(new >>>> HashPartitioner(8)).setName("u").persist() >>>> >>> >>>> >>> u.count() >>>> >>> >>>> >>> If we run this from the spark shell, the data (52 MB) is split >>>> across >>>> >>> the >>>> >>> two worker nodes. But if we put this in a scala program and run it, >>>> then >>>> >>> all the data goes to only one node. We have run it multiple times, >>>> but >>>> >>> this >>>> >>> behavior does not change. This seems strange. >>>> >>> >>>> >>> Is there some problem with the way we use HashPartitioner? >>>> >>> >>>> >>> Thanks in advance. >>>> >>> >>>> >>> Regards, >>>> >>> Raghava. >>>> >>> >>>> >> >>>> >> >>>> > >>>> > >>>> > -- >>>> > Regards, >>>> > Raghava >>>> > http://raghavam.github.io >>>> > >>>> >>>> >>>> -- >>>> Thanks, >>>> Mike >>>> >>> >>> >>> >>> -- >>> Regards, >>> Raghava >>> http://raghavam.github.io >>> >> > > > -- > Regards, > Raghava > http://raghavam.github.io >