Mike, All,

It turns out that the second time we encountered the uneven-partition issue
is not due to spark-submit. It was resolved with the change in placement of
count().

Case-1:

val numPartitions = 8
// read uAxioms from HDFS, use hash partitioner on it and persist it
// read type1Axioms from HDFS, use hash partitioner on it and persist it
currDeltaURule1 = type1Axioms.join(uAxioms)
                             .values
                             .distinct(numPartitions)
                             .partitionBy(hashPartitioner)
currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)

 .persist(StorageLevel.MEMORY_AND_DISK)
                                       .count()

<more transformations and actions>

currDeltaURule1 RDD results in all the data on one node (there are 2 worker
nodes). If we move count(), the uneven partition issue is resolved.

Case-2:

currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)

 .persist(StorageLevel.MEMORY_AND_DISK)
                                       <count not run here>

<some more transformations that don't affect currDeltaURule1 rdd>

<rdd.count()> -- this rdd depends on currDeltaURule1 and it gets executed.
This resolved the uneven partitioning issue.

I don't see why the moving of an action to a later part in the code would
affect the partitioning. Are there other factors at play here that affect
the partitioning?

(Inconsistent) uneven partitioning leads to one machine getting over
burdened (memory and number of tasks). We see a clear improvement in
runtime when the partitioning is even (happens when count is moved).

Any pointers in figuring out this issue is much appreciated.

Regards,
Raghava.




On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:

> Glad to hear that the problem was solvable! I have not seen delays of this
> type for later stages in jobs run by spark-submit, but I do not think it
> impossible if your stage has no lineage dependence on other RDDs.
>
> I'm CC'ing the dev list to report of other users observing load imbalance
> caused by unusual initial task scheduling. I don't know of ways to avoid
> this other than creating a dummy task to synchronize the executors, but
> hopefully someone from there can suggest other possibilities.
>
> Mike
> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
> wrote:
>
>> Mike,
>>
>> It turns out the executor delay, as you mentioned, is the cause. After we
>> introduced a dummy stage, partitioning was working fine. Does this delay
>> happen during later stages as well? We noticed the same behavior
>> (partitioning happens on spark-shell but not through spark-submit) at a
>> later stage also.
>>
>> Apart from introducing a dummy stage or running it from spark-shell, is
>> there any other option to fix this?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> When submitting a job with spark-submit, I've observed delays (up to
>>> 1--2 seconds) for the executors to respond to the driver in order to
>>> receive tasks in the first stage. The delay does not persist once the
>>> executors have been synchronized.
>>>
>>> When the tasks are very short, as may be your case (relatively small
>>> data and a simple map task like you have described), the 8 tasks in
>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> the second executor won't have responded to the master before the
>>> first 4 tasks on the first executor have completed.
>>>
>>> To see if this is the cause in your particular case, you could try the
>>> following to confirm:
>>>         1. Examine the starting times of the tasks alongside their
>>> executor
>>>         2. Make a "dummy" stage execute before your real stages to
>>> synchronize the executors by creating and materializing any random RDD
>>>         3. Make the tasks longer, i.e. with some silly computational
>>> work.
>>>
>>> Mike
>>>
>>>
>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>> > Yes its the same data.
>>> >
>>> > 1) The number of partitions are the same (8, which is an argument to
>>> the
>>> > HashPartitioner). In the first case, these partitions are spread across
>>> > both the worker nodes. In the second case, all the partitions are on
>>> the
>>> > same node.
>>> > 2) What resources would be of interest here? Scala shell takes the
>>> default
>>> > parameters since we use "bin/spark-shell --master <master-URL>" to run
>>> the
>>> > scala-shell. For the scala program, we do set some configuration
>>> options
>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>> > serializer.
>>> >
>>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>>> > RAM.1 executor runs on each worker node. Following configuration
>>> options
>>> > are set for the scala program -- perhaps we should move it to the spark
>>> > config file.
>>> >
>>> > Driver memory and executor memory are set to 12GB
>>> > parallelism is set to 8
>>> > Kryo serializer is used
>>> > Number of retainedJobs and retainedStages has been increased to check
>>> them
>>> > in the UI.
>>> >
>>> > What information regarding Spark Context would be of interest here?
>>> >
>>> > Regards,
>>> > Raghava.
>>> >
>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
>>> wrote:
>>> >
>>> >> If the data file is same then it should have similar distribution of
>>> >> keys.
>>> >> Few queries-
>>> >>
>>> >> 1. Did you compare the number of partitions in both the cases?
>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> >> Program being submitted?
>>> >>
>>> >> Also, can you please share the details of Spark Context, Environment
>>> and
>>> >> Executors when you run via Scala program?
>>> >>
>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> >> m.vijayaragh...@gmail.com> wrote:
>>> >>
>>> >>> Hello All,
>>> >>>
>>> >>> We are using HashPartitioner in the following way on a 3 node
>>> cluster (1
>>> >>> master and 2 worker nodes).
>>> >>>
>>> >>> val u =
>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>> (y.toInt,
>>> >>> x.toInt) } }).partitionBy(new
>>> HashPartitioner(8)).setName("u").persist()
>>> >>>
>>> >>> u.count()
>>> >>>
>>> >>> If we run this from the spark shell, the data (52 MB) is split across
>>> >>> the
>>> >>> two worker nodes. But if we put this in a scala program and run it,
>>> then
>>> >>> all the data goes to only one node. We have run it multiple times,
>>> but
>>> >>> this
>>> >>> behavior does not change. This seems strange.
>>> >>>
>>> >>> Is there some problem with the way we use HashPartitioner?
>>> >>>
>>> >>> Thanks in advance.
>>> >>>
>>> >>> Regards,
>>> >>> Raghava.
>>> >>>
>>> >>
>>> >>
>>> >
>>> >
>>> > --
>>> > Regards,
>>> > Raghava
>>> > http://raghavam.github.io
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>


-- 
Regards,
Raghava
http://raghavam.github.io

Reply via email to