Hi Raghava,

I'm terribly sorry about the end of my last email; that garbled
sentence was garbled because it wasn't meant to exist; I wrote it on
my phone, realized I wouldn't realistically have time to look into
another set of logs deeply enough, and then mistook myself for having
deleted it. Again, I'm very sorry for my error here.

I did peek at your code, though, and think you could try the following:
0. The actions in your main method are many, and it will be hard to
isolate a problem; I would recommend only examing *one* RDD at first,
rather than six.
1. There is a lot of repetition for reading RDDs from textfiles
sequentially; if you put those lines into two methods depending on RDD
type, you will at least have one entry point to work with once you
make a simplified test program.
2. In one part you persist, count, immediately unpersist, and then
count again an RDD.. I'm not acquainted with this idiom, and I don't
understand what that is to achieve. It strikes me suspect for
triggering unusual garbage collection, which would, I think, only
complicate your trace debugging.

I've attached a python script that dumps relevant info from the Spark
JSON logs into a CSV for easier analysis in you language of choice;
hopefully it can aid in finer grained debugging (the headers of the
fields it prints are listed in one of the functions).

Mike

On 4/25/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
> Mike,
>
> We ran our program with 16, 32 and 64 partitions. The behavior was same as
> before with 8 partitions. It was mixed -- for some RDDs we see an
> all-nothing skew, but for others we see them getting split across the 2
> worker nodes. In some cases, they start with even split and in later
> iterations it goes to all-nothing split. Please find the logs attached.
>
> our program source code:
> https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala
>
> We put in persist() statements for different RDDs just to check their skew.
>
> @Jeff, setting minRegisteredResourcesRatio did not help. Behavior was same
> as before.
>
> Thank you for your time.
>
> Regards,
> Raghava.
>
>
> On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Could you change numPartitions to {16, 32, 64} and run your program for
>> each to see how many partitions are allocated to each worker? Let's see
>> if
>> you experience an all-nothing imbalance that way; if so, my guess is that
>> something else is odd in your program logic or spark runtime environment,
>> but if not and your executors all receive at least *some* partitions,
>> then
>> I still wouldn't rule out effects of scheduling delay. It's a simple
>> test,
>> but it could give some insight.
>>
>> Mike
>>
>> his could still be a  scheduling  If only one has *all* partitions,  and
>> email me the log file? (If it's 10+ MB, just the first few thousand lines
>> are fine).
>> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
>> wrote:
>>
>>> Mike, All,
>>>
>>> It turns out that the second time we encountered the uneven-partition
>>> issue is not due to spark-submit. It was resolved with the change in
>>> placement of count().
>>>
>>> Case-1:
>>>
>>> val numPartitions = 8
>>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>>                              .values
>>>                              .distinct(numPartitions)
>>>                              .partitionBy(hashPartitioner)
>>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>>
>>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>>                                        .count()
>>>
>>> <more transformations and actions>
>>>
>>> currDeltaURule1 RDD results in all the data on one node (there are 2
>>> worker nodes). If we move count(), the uneven partition issue is
>>> resolved.
>>>
>>> Case-2:
>>>
>>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>>
>>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>>                                        <count not run here>
>>>
>>> <some more transformations that don't affect currDeltaURule1 rdd>
>>>
>>> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets
>>> executed. This resolved the uneven partitioning issue.
>>>
>>> I don't see why the moving of an action to a later part in the code
>>> would
>>> affect the partitioning. Are there other factors at play here that
>>> affect
>>> the partitioning?
>>>
>>> (Inconsistent) uneven partitioning leads to one machine getting over
>>> burdened (memory and number of tasks). We see a clear improvement in
>>> runtime when the partitioning is even (happens when count is moved).
>>>
>>> Any pointers in figuring out this issue is much appreciated.
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>>
>>>
>>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:
>>>
>>>> Glad to hear that the problem was solvable! I have not seen delays of
>>>> this type for later stages in jobs run by spark-submit, but I do not
>>>> think
>>>> it impossible if your stage has no lineage dependence on other RDDs.
>>>>
>>>> I'm CC'ing the dev list to report of other users observing load
>>>> imbalance caused by unusual initial task scheduling. I don't know of
>>>> ways
>>>> to avoid this other than creating a dummy task to synchronize the
>>>> executors, but hopefully someone from there can suggest other
>>>> possibilities.
>>>>
>>>> Mike
>>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju"
>>>> <m.vijayaragh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Mike,
>>>>>
>>>>> It turns out the executor delay, as you mentioned, is the cause. After
>>>>> we introduced a dummy stage, partitioning was working fine. Does this
>>>>> delay
>>>>> happen during later stages as well? We noticed the same behavior
>>>>> (partitioning happens on spark-shell but not through spark-submit) at
>>>>> a
>>>>> later stage also.
>>>>>
>>>>> Apart from introducing a dummy stage or running it from spark-shell,
>>>>> is
>>>>> there any other option to fix this?
>>>>>
>>>>> Regards,
>>>>> Raghava.
>>>>>
>>>>>
>>>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>>>>
>>>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>>>> receive tasks in the first stage. The delay does not persist once the
>>>>>> executors have been synchronized.
>>>>>>
>>>>>> When the tasks are very short, as may be your case (relatively small
>>>>>> data and a simple map task like you have described), the 8 tasks in
>>>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>>>> the second executor won't have responded to the master before the
>>>>>> first 4 tasks on the first executor have completed.
>>>>>>
>>>>>> To see if this is the cause in your particular case, you could try
>>>>>> the
>>>>>> following to confirm:
>>>>>>         1. Examine the starting times of the tasks alongside their
>>>>>> executor
>>>>>>         2. Make a "dummy" stage execute before your real stages to
>>>>>> synchronize the executors by creating and materializing any random
>>>>>> RDD
>>>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>>>> work.
>>>>>>
>>>>>> Mike
>>>>>>
>>>>>>
>>>>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>>>>> > Yes its the same data.
>>>>>> >
>>>>>> > 1) The number of partitions are the same (8, which is an argument
>>>>>> > to
>>>>>> the
>>>>>> > HashPartitioner). In the first case, these partitions are spread
>>>>>> across
>>>>>> > both the worker nodes. In the second case, all the partitions are
>>>>>> > on
>>>>>> the
>>>>>> > same node.
>>>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>>>> default
>>>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>>>> run the
>>>>>> > scala-shell. For the scala program, we do set some configuration
>>>>>> options
>>>>>> > such as driver memory (12GB), parallelism is set to 8 and we use
>>>>>> > Kryo
>>>>>> > serializer.
>>>>>> >
>>>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>>>> 14GB
>>>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>>>> options
>>>>>> > are set for the scala program -- perhaps we should move it to the
>>>>>> spark
>>>>>> > config file.
>>>>>> >
>>>>>> > Driver memory and executor memory are set to 12GB
>>>>>> > parallelism is set to 8
>>>>>> > Kryo serializer is used
>>>>>> > Number of retainedJobs and retainedStages has been increased to
>>>>>> check them
>>>>>> > in the UI.
>>>>>> >
>>>>>> > What information regarding Spark Context would be of interest here?
>>>>>> >
>>>>>> > Regards,
>>>>>> > Raghava.
>>>>>> >
>>>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> >> If the data file is same then it should have similar distribution
>>>>>> >> of
>>>>>> >> keys.
>>>>>> >> Few queries-
>>>>>> >>
>>>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>>>> >> 2. Did you compare the resource allocation for Spark Shell vs
>>>>>> >> Scala
>>>>>> >> Program being submitted?
>>>>>> >>
>>>>>> >> Also, can you please share the details of Spark Context,
>>>>>> Environment and
>>>>>> >> Executors when you run via Scala program?
>>>>>> >>
>>>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>>>> >> m.vijayaragh...@gmail.com> wrote:
>>>>>> >>
>>>>>> >>> Hello All,
>>>>>> >>>
>>>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>>>> cluster (1
>>>>>> >>> master and 2 worker nodes).
>>>>>> >>>
>>>>>> >>> val u =
>>>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>>>> (y.toInt,
>>>>>> >>> x.toInt) } }).partitionBy(new
>>>>>> HashPartitioner(8)).setName("u").persist()
>>>>>> >>>
>>>>>> >>> u.count()
>>>>>> >>>
>>>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>>>> across
>>>>>> >>> the
>>>>>> >>> two worker nodes. But if we put this in a scala program and run
>>>>>> it, then
>>>>>> >>> all the data goes to only one node. We have run it multiple
>>>>>> >>> times,
>>>>>> but
>>>>>> >>> this
>>>>>> >>> behavior does not change. This seems strange.
>>>>>> >>>
>>>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>>>> >>>
>>>>>> >>> Thanks in advance.
>>>>>> >>>
>>>>>> >>> Regards,
>>>>>> >>> Raghava.
>>>>>> >>>
>>>>>> >>
>>>>>> >>
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > Regards,
>>>>>> > Raghava
>>>>>> > http://raghavam.github.io
>>>>>> >
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Mike
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Raghava
>>>>> http://raghavam.github.io
>>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Raghava
>>> http://raghavam.github.io
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


-- 
Thanks,
Mike

Attachment: taskinfo.py
Description: application/py

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to