Maybe this is due to config spark.scheduler.minRegisteredResourcesRatio, you can try set it as 1 to see the behavior.
// Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. var minRegisteredRatio = math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) On Mon, Apr 25, 2016 at 7:17 AM, Mike Hynes <91m...@gmail.com> wrote: > Could you change numPartitions to {16, 32, 64} and run your program for > each to see how many partitions are allocated to each worker? Let's see if > you experience an all-nothing imbalance that way; if so, my guess is that > something else is odd in your program logic or spark runtime environment, > but if not and your executors all receive at least *some* partitions, then > I still wouldn't rule out effects of scheduling delay. It's a simple test, > but it could give some insight. > > Mike > > his could still be a scheduling If only one has *all* partitions, and > email me the log file? (If it's 10+ MB, just the first few thousand lines > are fine). > On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com> > wrote: > >> Mike, All, >> >> It turns out that the second time we encountered the uneven-partition >> issue is not due to spark-submit. It was resolved with the change in >> placement of count(). >> >> Case-1: >> >> val numPartitions = 8 >> // read uAxioms from HDFS, use hash partitioner on it and persist it >> // read type1Axioms from HDFS, use hash partitioner on it and persist it >> currDeltaURule1 = type1Axioms.join(uAxioms) >> .values >> .distinct(numPartitions) >> .partitionBy(hashPartitioner) >> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) >> >> .persist(StorageLevel.MEMORY_AND_DISK) >> .count() >> >> <more transformations and actions> >> >> currDeltaURule1 RDD results in all the data on one node (there are 2 >> worker nodes). If we move count(), the uneven partition issue is resolved. >> >> Case-2: >> >> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) >> >> .persist(StorageLevel.MEMORY_AND_DISK) >> <count not run here> >> >> <some more transformations that don't affect currDeltaURule1 rdd> >> >> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets >> executed. This resolved the uneven partitioning issue. >> >> I don't see why the moving of an action to a later part in the code would >> affect the partitioning. Are there other factors at play here that affect >> the partitioning? >> >> (Inconsistent) uneven partitioning leads to one machine getting over >> burdened (memory and number of tasks). We see a clear improvement in >> runtime when the partitioning is even (happens when count is moved). >> >> Any pointers in figuring out this issue is much appreciated. >> >> Regards, >> Raghava. >> >> >> >> >> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote: >> >>> Glad to hear that the problem was solvable! I have not seen delays of >>> this type for later stages in jobs run by spark-submit, but I do not think >>> it impossible if your stage has no lineage dependence on other RDDs. >>> >>> I'm CC'ing the dev list to report of other users observing load >>> imbalance caused by unusual initial task scheduling. I don't know of ways >>> to avoid this other than creating a dummy task to synchronize the >>> executors, but hopefully someone from there can suggest other possibilities. >>> >>> Mike >>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com> >>> wrote: >>> >>>> Mike, >>>> >>>> It turns out the executor delay, as you mentioned, is the cause. After >>>> we introduced a dummy stage, partitioning was working fine. Does this delay >>>> happen during later stages as well? We noticed the same behavior >>>> (partitioning happens on spark-shell but not through spark-submit) at a >>>> later stage also. >>>> >>>> Apart from introducing a dummy stage or running it from spark-shell, is >>>> there any other option to fix this? >>>> >>>> Regards, >>>> Raghava. >>>> >>>> >>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote: >>>> >>>>> When submitting a job with spark-submit, I've observed delays (up to >>>>> 1--2 seconds) for the executors to respond to the driver in order to >>>>> receive tasks in the first stage. The delay does not persist once the >>>>> executors have been synchronized. >>>>> >>>>> When the tasks are very short, as may be your case (relatively small >>>>> data and a simple map task like you have described), the 8 tasks in >>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since >>>>> the second executor won't have responded to the master before the >>>>> first 4 tasks on the first executor have completed. >>>>> >>>>> To see if this is the cause in your particular case, you could try the >>>>> following to confirm: >>>>> 1. Examine the starting times of the tasks alongside their >>>>> executor >>>>> 2. Make a "dummy" stage execute before your real stages to >>>>> synchronize the executors by creating and materializing any random RDD >>>>> 3. Make the tasks longer, i.e. with some silly computational >>>>> work. >>>>> >>>>> Mike >>>>> >>>>> >>>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: >>>>> > Yes its the same data. >>>>> > >>>>> > 1) The number of partitions are the same (8, which is an argument to >>>>> the >>>>> > HashPartitioner). In the first case, these partitions are spread >>>>> across >>>>> > both the worker nodes. In the second case, all the partitions are on >>>>> the >>>>> > same node. >>>>> > 2) What resources would be of interest here? Scala shell takes the >>>>> default >>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to >>>>> run the >>>>> > scala-shell. For the scala program, we do set some configuration >>>>> options >>>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo >>>>> > serializer. >>>>> > >>>>> > We are running this on Azure D3-v2 machines which have 4 cores and >>>>> 14GB >>>>> > RAM.1 executor runs on each worker node. Following configuration >>>>> options >>>>> > are set for the scala program -- perhaps we should move it to the >>>>> spark >>>>> > config file. >>>>> > >>>>> > Driver memory and executor memory are set to 12GB >>>>> > parallelism is set to 8 >>>>> > Kryo serializer is used >>>>> > Number of retainedJobs and retainedStages has been increased to >>>>> check them >>>>> > in the UI. >>>>> > >>>>> > What information regarding Spark Context would be of interest here? >>>>> > >>>>> > Regards, >>>>> > Raghava. >>>>> > >>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> >>>>> wrote: >>>>> > >>>>> >> If the data file is same then it should have similar distribution of >>>>> >> keys. >>>>> >> Few queries- >>>>> >> >>>>> >> 1. Did you compare the number of partitions in both the cases? >>>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala >>>>> >> Program being submitted? >>>>> >> >>>>> >> Also, can you please share the details of Spark Context, >>>>> Environment and >>>>> >> Executors when you run via Scala program? >>>>> >> >>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >>>>> >> m.vijayaragh...@gmail.com> wrote: >>>>> >> >>>>> >>> Hello All, >>>>> >>> >>>>> >>> We are using HashPartitioner in the following way on a 3 node >>>>> cluster (1 >>>>> >>> master and 2 worker nodes). >>>>> >>> >>>>> >>> val u = >>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, >>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => >>>>> (y.toInt, >>>>> >>> x.toInt) } }).partitionBy(new >>>>> HashPartitioner(8)).setName("u").persist() >>>>> >>> >>>>> >>> u.count() >>>>> >>> >>>>> >>> If we run this from the spark shell, the data (52 MB) is split >>>>> across >>>>> >>> the >>>>> >>> two worker nodes. But if we put this in a scala program and run >>>>> it, then >>>>> >>> all the data goes to only one node. We have run it multiple times, >>>>> but >>>>> >>> this >>>>> >>> behavior does not change. This seems strange. >>>>> >>> >>>>> >>> Is there some problem with the way we use HashPartitioner? >>>>> >>> >>>>> >>> Thanks in advance. >>>>> >>> >>>>> >>> Regards, >>>>> >>> Raghava. >>>>> >>> >>>>> >> >>>>> >> >>>>> > >>>>> > >>>>> > -- >>>>> > Regards, >>>>> > Raghava >>>>> > http://raghavam.github.io >>>>> > >>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Mike >>>>> >>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Raghava >>>> http://raghavam.github.io >>>> >>> >> >> >> -- >> Regards, >> Raghava >> http://raghavam.github.io >> > -- Best Regards Jeff Zhang