Hi Raghava, I'm terribly sorry about the end of my last email; that garbled sentence was garbled because it wasn't meant to exist; I wrote it on my phone, realized I wouldn't realistically have time to look into another set of logs deeply enough, and then mistook myself for having deleted it. Again, I'm very sorry for my error here.
I did peek at your code, though, and think you could try the following: 0. The actions in your main method are many, and it will be hard to isolate a problem; I would recommend only examing *one* RDD at first, rather than six. 1. There is a lot of repetition for reading RDDs from textfiles sequentially; if you put those lines into two methods depending on RDD type, you will at least have one entry point to work with once you make a simplified test program. 2. In one part you persist, count, immediately unpersist, and then count again an RDD.. I'm not acquainted with this idiom, and I don't understand what that is to achieve. It strikes me suspect for triggering unusual garbage collection, which would, I think, only complicate your trace debugging. I've attached a python script that dumps relevant info from the Spark JSON logs into a CSV for easier analysis in you language of choice; hopefully it can aid in finer grained debugging (the headers of the fields it prints are listed in one of the functions). Mike On 4/25/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: > Mike, > > We ran our program with 16, 32 and 64 partitions. The behavior was same as > before with 8 partitions. It was mixed -- for some RDDs we see an > all-nothing skew, but for others we see them getting split across the 2 > worker nodes. In some cases, they start with even split and in later > iterations it goes to all-nothing split. Please find the logs attached. > > our program source code: > https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala > > We put in persist() statements for different RDDs just to check their skew. > > @Jeff, setting minRegisteredResourcesRatio did not help. Behavior was same > as before. > > Thank you for your time. > > Regards, > Raghava. > > > On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Could you change numPartitions to {16, 32, 64} and run your program for >> each to see how many partitions are allocated to each worker? Let's see >> if >> you experience an all-nothing imbalance that way; if so, my guess is that >> something else is odd in your program logic or spark runtime environment, >> but if not and your executors all receive at least *some* partitions, >> then >> I still wouldn't rule out effects of scheduling delay. It's a simple >> test, >> but it could give some insight. >> >> Mike >> >> his could still be a scheduling If only one has *all* partitions, and >> email me the log file? (If it's 10+ MB, just the first few thousand lines >> are fine). >> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com> >> wrote: >> >>> Mike, All, >>> >>> It turns out that the second time we encountered the uneven-partition >>> issue is not due to spark-submit. It was resolved with the change in >>> placement of count(). >>> >>> Case-1: >>> >>> val numPartitions = 8 >>> // read uAxioms from HDFS, use hash partitioner on it and persist it >>> // read type1Axioms from HDFS, use hash partitioner on it and persist it >>> currDeltaURule1 = type1Axioms.join(uAxioms) >>> .values >>> .distinct(numPartitions) >>> .partitionBy(hashPartitioner) >>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) >>> >>> .persist(StorageLevel.MEMORY_AND_DISK) >>> .count() >>> >>> <more transformations and actions> >>> >>> currDeltaURule1 RDD results in all the data on one node (there are 2 >>> worker nodes). If we move count(), the uneven partition issue is >>> resolved. >>> >>> Case-2: >>> >>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter) >>> >>> .persist(StorageLevel.MEMORY_AND_DISK) >>> <count not run here> >>> >>> <some more transformations that don't affect currDeltaURule1 rdd> >>> >>> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets >>> executed. This resolved the uneven partitioning issue. >>> >>> I don't see why the moving of an action to a later part in the code >>> would >>> affect the partitioning. Are there other factors at play here that >>> affect >>> the partitioning? >>> >>> (Inconsistent) uneven partitioning leads to one machine getting over >>> burdened (memory and number of tasks). We see a clear improvement in >>> runtime when the partitioning is even (happens when count is moved). >>> >>> Any pointers in figuring out this issue is much appreciated. >>> >>> Regards, >>> Raghava. >>> >>> >>> >>> >>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote: >>> >>>> Glad to hear that the problem was solvable! I have not seen delays of >>>> this type for later stages in jobs run by spark-submit, but I do not >>>> think >>>> it impossible if your stage has no lineage dependence on other RDDs. >>>> >>>> I'm CC'ing the dev list to report of other users observing load >>>> imbalance caused by unusual initial task scheduling. I don't know of >>>> ways >>>> to avoid this other than creating a dummy task to synchronize the >>>> executors, but hopefully someone from there can suggest other >>>> possibilities. >>>> >>>> Mike >>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" >>>> <m.vijayaragh...@gmail.com> >>>> wrote: >>>> >>>>> Mike, >>>>> >>>>> It turns out the executor delay, as you mentioned, is the cause. After >>>>> we introduced a dummy stage, partitioning was working fine. Does this >>>>> delay >>>>> happen during later stages as well? We noticed the same behavior >>>>> (partitioning happens on spark-shell but not through spark-submit) at >>>>> a >>>>> later stage also. >>>>> >>>>> Apart from introducing a dummy stage or running it from spark-shell, >>>>> is >>>>> there any other option to fix this? >>>>> >>>>> Regards, >>>>> Raghava. >>>>> >>>>> >>>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote: >>>>> >>>>>> When submitting a job with spark-submit, I've observed delays (up to >>>>>> 1--2 seconds) for the executors to respond to the driver in order to >>>>>> receive tasks in the first stage. The delay does not persist once the >>>>>> executors have been synchronized. >>>>>> >>>>>> When the tasks are very short, as may be your case (relatively small >>>>>> data and a simple map task like you have described), the 8 tasks in >>>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since >>>>>> the second executor won't have responded to the master before the >>>>>> first 4 tasks on the first executor have completed. >>>>>> >>>>>> To see if this is the cause in your particular case, you could try >>>>>> the >>>>>> following to confirm: >>>>>> 1. Examine the starting times of the tasks alongside their >>>>>> executor >>>>>> 2. Make a "dummy" stage execute before your real stages to >>>>>> synchronize the executors by creating and materializing any random >>>>>> RDD >>>>>> 3. Make the tasks longer, i.e. with some silly computational >>>>>> work. >>>>>> >>>>>> Mike >>>>>> >>>>>> >>>>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: >>>>>> > Yes its the same data. >>>>>> > >>>>>> > 1) The number of partitions are the same (8, which is an argument >>>>>> > to >>>>>> the >>>>>> > HashPartitioner). In the first case, these partitions are spread >>>>>> across >>>>>> > both the worker nodes. In the second case, all the partitions are >>>>>> > on >>>>>> the >>>>>> > same node. >>>>>> > 2) What resources would be of interest here? Scala shell takes the >>>>>> default >>>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to >>>>>> run the >>>>>> > scala-shell. For the scala program, we do set some configuration >>>>>> options >>>>>> > such as driver memory (12GB), parallelism is set to 8 and we use >>>>>> > Kryo >>>>>> > serializer. >>>>>> > >>>>>> > We are running this on Azure D3-v2 machines which have 4 cores and >>>>>> 14GB >>>>>> > RAM.1 executor runs on each worker node. Following configuration >>>>>> options >>>>>> > are set for the scala program -- perhaps we should move it to the >>>>>> spark >>>>>> > config file. >>>>>> > >>>>>> > Driver memory and executor memory are set to 12GB >>>>>> > parallelism is set to 8 >>>>>> > Kryo serializer is used >>>>>> > Number of retainedJobs and retainedStages has been increased to >>>>>> check them >>>>>> > in the UI. >>>>>> > >>>>>> > What information regarding Spark Context would be of interest here? >>>>>> > >>>>>> > Regards, >>>>>> > Raghava. >>>>>> > >>>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> >>>>>> wrote: >>>>>> > >>>>>> >> If the data file is same then it should have similar distribution >>>>>> >> of >>>>>> >> keys. >>>>>> >> Few queries- >>>>>> >> >>>>>> >> 1. Did you compare the number of partitions in both the cases? >>>>>> >> 2. Did you compare the resource allocation for Spark Shell vs >>>>>> >> Scala >>>>>> >> Program being submitted? >>>>>> >> >>>>>> >> Also, can you please share the details of Spark Context, >>>>>> Environment and >>>>>> >> Executors when you run via Scala program? >>>>>> >> >>>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < >>>>>> >> m.vijayaragh...@gmail.com> wrote: >>>>>> >> >>>>>> >>> Hello All, >>>>>> >>> >>>>>> >>> We are using HashPartitioner in the following way on a 3 node >>>>>> cluster (1 >>>>>> >>> master and 2 worker nodes). >>>>>> >>> >>>>>> >>> val u = >>>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, >>>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => >>>>>> (y.toInt, >>>>>> >>> x.toInt) } }).partitionBy(new >>>>>> HashPartitioner(8)).setName("u").persist() >>>>>> >>> >>>>>> >>> u.count() >>>>>> >>> >>>>>> >>> If we run this from the spark shell, the data (52 MB) is split >>>>>> across >>>>>> >>> the >>>>>> >>> two worker nodes. But if we put this in a scala program and run >>>>>> it, then >>>>>> >>> all the data goes to only one node. We have run it multiple >>>>>> >>> times, >>>>>> but >>>>>> >>> this >>>>>> >>> behavior does not change. This seems strange. >>>>>> >>> >>>>>> >>> Is there some problem with the way we use HashPartitioner? >>>>>> >>> >>>>>> >>> Thanks in advance. >>>>>> >>> >>>>>> >>> Regards, >>>>>> >>> Raghava. >>>>>> >>> >>>>>> >> >>>>>> >> >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > Regards, >>>>>> > Raghava >>>>>> > http://raghavam.github.io >>>>>> > >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Mike >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Regards, >>>>> Raghava >>>>> http://raghavam.github.io >>>>> >>>> >>> >>> >>> -- >>> Regards, >>> Raghava >>> http://raghavam.github.io >>> >> > > > -- > Regards, > Raghava > http://raghavam.github.io > -- Thanks, Mike
taskinfo.py
Description: application/py
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org