Could you change numPartitions to {16, 32, 64} and run your program for
each to see how many partitions are allocated to each worker? Let's see if
you experience an all-nothing imbalance that way; if so, my guess is that
something else is odd in your program logic or spark runtime environment,
but if not and your executors all receive at least *some* partitions, then
I still wouldn't rule out effects of scheduling delay. It's a simple test,
but it could give some insight.

Mike

his could still be a  scheduling  If only one has *all* partitions,  and
email me the log file? (If it's 10+ MB, just the first few thousand lines
are fine).
On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
wrote:

> Mike, All,
>
> It turns out that the second time we encountered the uneven-partition
> issue is not due to spark-submit. It was resolved with the change in
> placement of count().
>
> Case-1:
>
> val numPartitions = 8
> // read uAxioms from HDFS, use hash partitioner on it and persist it
> // read type1Axioms from HDFS, use hash partitioner on it and persist it
> currDeltaURule1 = type1Axioms.join(uAxioms)
>                              .values
>                              .distinct(numPartitions)
>                              .partitionBy(hashPartitioner)
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>                                        .count()
>
> <more transformations and actions>
>
> currDeltaURule1 RDD results in all the data on one node (there are 2
> worker nodes). If we move count(), the uneven partition issue is resolved.
>
> Case-2:
>
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>                                        <count not run here>
>
> <some more transformations that don't affect currDeltaURule1 rdd>
>
> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets executed.
> This resolved the uneven partitioning issue.
>
> I don't see why the moving of an action to a later part in the code would
> affect the partitioning. Are there other factors at play here that affect
> the partitioning?
>
> (Inconsistent) uneven partitioning leads to one machine getting over
> burdened (memory and number of tasks). We see a clear improvement in
> runtime when the partitioning is even (happens when count is moved).
>
> Any pointers in figuring out this issue is much appreciated.
>
> Regards,
> Raghava.
>
>
>
>
> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Glad to hear that the problem was solvable! I have not seen delays of
>> this type for later stages in jobs run by spark-submit, but I do not think
>> it impossible if your stage has no lineage dependence on other RDDs.
>>
>> I'm CC'ing the dev list to report of other users observing load imbalance
>> caused by unusual initial task scheduling. I don't know of ways to avoid
>> this other than creating a dummy task to synchronize the executors, but
>> hopefully someone from there can suggest other possibilities.
>>
>> Mike
>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
>> wrote:
>>
>>> Mike,
>>>
>>> It turns out the executor delay, as you mentioned, is the cause. After
>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>> happen during later stages as well? We noticed the same behavior
>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>> later stage also.
>>>
>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>> there any other option to fix this?
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>>
>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>> receive tasks in the first stage. The delay does not persist once the
>>>> executors have been synchronized.
>>>>
>>>> When the tasks are very short, as may be your case (relatively small
>>>> data and a simple map task like you have described), the 8 tasks in
>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>> the second executor won't have responded to the master before the
>>>> first 4 tasks on the first executor have completed.
>>>>
>>>> To see if this is the cause in your particular case, you could try the
>>>> following to confirm:
>>>>         1. Examine the starting times of the tasks alongside their
>>>> executor
>>>>         2. Make a "dummy" stage execute before your real stages to
>>>> synchronize the executors by creating and materializing any random RDD
>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>> work.
>>>>
>>>> Mike
>>>>
>>>>
>>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>>> > Yes its the same data.
>>>> >
>>>> > 1) The number of partitions are the same (8, which is an argument to
>>>> the
>>>> > HashPartitioner). In the first case, these partitions are spread
>>>> across
>>>> > both the worker nodes. In the second case, all the partitions are on
>>>> the
>>>> > same node.
>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>> default
>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>> run the
>>>> > scala-shell. For the scala program, we do set some configuration
>>>> options
>>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>>> > serializer.
>>>> >
>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>> 14GB
>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>> options
>>>> > are set for the scala program -- perhaps we should move it to the
>>>> spark
>>>> > config file.
>>>> >
>>>> > Driver memory and executor memory are set to 12GB
>>>> > parallelism is set to 8
>>>> > Kryo serializer is used
>>>> > Number of retainedJobs and retainedStages has been increased to check
>>>> them
>>>> > in the UI.
>>>> >
>>>> > What information regarding Spark Context would be of interest here?
>>>> >
>>>> > Regards,
>>>> > Raghava.
>>>> >
>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> If the data file is same then it should have similar distribution of
>>>> >> keys.
>>>> >> Few queries-
>>>> >>
>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>>> >> Program being submitted?
>>>> >>
>>>> >> Also, can you please share the details of Spark Context, Environment
>>>> and
>>>> >> Executors when you run via Scala program?
>>>> >>
>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>> >> m.vijayaragh...@gmail.com> wrote:
>>>> >>
>>>> >>> Hello All,
>>>> >>>
>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>> cluster (1
>>>> >>> master and 2 worker nodes).
>>>> >>>
>>>> >>> val u =
>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>> (y.toInt,
>>>> >>> x.toInt) } }).partitionBy(new
>>>> HashPartitioner(8)).setName("u").persist()
>>>> >>>
>>>> >>> u.count()
>>>> >>>
>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>> across
>>>> >>> the
>>>> >>> two worker nodes. But if we put this in a scala program and run it,
>>>> then
>>>> >>> all the data goes to only one node. We have run it multiple times,
>>>> but
>>>> >>> this
>>>> >>> behavior does not change. This seems strange.
>>>> >>>
>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>> >>>
>>>> >>> Thanks in advance.
>>>> >>>
>>>> >>> Regards,
>>>> >>> Raghava.
>>>> >>>
>>>> >>
>>>> >>
>>>> >
>>>> >
>>>> > --
>>>> > Regards,
>>>> > Raghava
>>>> > http://raghavam.github.io
>>>> >
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Mike
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Raghava
>>> http://raghavam.github.io
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

Reply via email to