Maybe this is due to config spark.scheduler.minRegisteredResourcesRatio,
you can try set it as 1 to see the behavior.


// Submit tasks only after (registered resources / total expected resources)

// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
  math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))


On Mon, Apr 25, 2016 at 7:17 AM, Mike Hynes <91m...@gmail.com> wrote:

> Could you change numPartitions to {16, 32, 64} and run your program for
> each to see how many partitions are allocated to each worker? Let's see if
> you experience an all-nothing imbalance that way; if so, my guess is that
> something else is odd in your program logic or spark runtime environment,
> but if not and your executors all receive at least *some* partitions, then
> I still wouldn't rule out effects of scheduling delay. It's a simple test,
> but it could give some insight.
>
> Mike
>
> his could still be a  scheduling  If only one has *all* partitions,  and
> email me the log file? (If it's 10+ MB, just the first few thousand lines
> are fine).
> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
> wrote:
>
>> Mike, All,
>>
>> It turns out that the second time we encountered the uneven-partition
>> issue is not due to spark-submit. It was resolved with the change in
>> placement of count().
>>
>> Case-1:
>>
>> val numPartitions = 8
>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>                              .values
>>                              .distinct(numPartitions)
>>                              .partitionBy(hashPartitioner)
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>                                        .count()
>>
>> <more transformations and actions>
>>
>> currDeltaURule1 RDD results in all the data on one node (there are 2
>> worker nodes). If we move count(), the uneven partition issue is resolved.
>>
>> Case-2:
>>
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>                                        <count not run here>
>>
>> <some more transformations that don't affect currDeltaURule1 rdd>
>>
>> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets
>> executed. This resolved the uneven partitioning issue.
>>
>> I don't see why the moving of an action to a later part in the code would
>> affect the partitioning. Are there other factors at play here that affect
>> the partitioning?
>>
>> (Inconsistent) uneven partitioning leads to one machine getting over
>> burdened (memory and number of tasks). We see a clear improvement in
>> runtime when the partitioning is even (happens when count is moved).
>>
>> Any pointers in figuring out this issue is much appreciated.
>>
>> Regards,
>> Raghava.
>>
>>
>>
>>
>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> Glad to hear that the problem was solvable! I have not seen delays of
>>> this type for later stages in jobs run by spark-submit, but I do not think
>>> it impossible if your stage has no lineage dependence on other RDDs.
>>>
>>> I'm CC'ing the dev list to report of other users observing load
>>> imbalance caused by unusual initial task scheduling. I don't know of ways
>>> to avoid this other than creating a dummy task to synchronize the
>>> executors, but hopefully someone from there can suggest other possibilities.
>>>
>>> Mike
>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
>>> wrote:
>>>
>>>> Mike,
>>>>
>>>> It turns out the executor delay, as you mentioned, is the cause. After
>>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>>> happen during later stages as well? We noticed the same behavior
>>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>>> later stage also.
>>>>
>>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>>> there any other option to fix this?
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>>
>>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>>>
>>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>>> receive tasks in the first stage. The delay does not persist once the
>>>>> executors have been synchronized.
>>>>>
>>>>> When the tasks are very short, as may be your case (relatively small
>>>>> data and a simple map task like you have described), the 8 tasks in
>>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>>> the second executor won't have responded to the master before the
>>>>> first 4 tasks on the first executor have completed.
>>>>>
>>>>> To see if this is the cause in your particular case, you could try the
>>>>> following to confirm:
>>>>>         1. Examine the starting times of the tasks alongside their
>>>>> executor
>>>>>         2. Make a "dummy" stage execute before your real stages to
>>>>> synchronize the executors by creating and materializing any random RDD
>>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>>> work.
>>>>>
>>>>> Mike
>>>>>
>>>>>
>>>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>>>> > Yes its the same data.
>>>>> >
>>>>> > 1) The number of partitions are the same (8, which is an argument to
>>>>> the
>>>>> > HashPartitioner). In the first case, these partitions are spread
>>>>> across
>>>>> > both the worker nodes. In the second case, all the partitions are on
>>>>> the
>>>>> > same node.
>>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>>> default
>>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>>> run the
>>>>> > scala-shell. For the scala program, we do set some configuration
>>>>> options
>>>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>>>> > serializer.
>>>>> >
>>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>>> 14GB
>>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>>> options
>>>>> > are set for the scala program -- perhaps we should move it to the
>>>>> spark
>>>>> > config file.
>>>>> >
>>>>> > Driver memory and executor memory are set to 12GB
>>>>> > parallelism is set to 8
>>>>> > Kryo serializer is used
>>>>> > Number of retainedJobs and retainedStages has been increased to
>>>>> check them
>>>>> > in the UI.
>>>>> >
>>>>> > What information regarding Spark Context would be of interest here?
>>>>> >
>>>>> > Regards,
>>>>> > Raghava.
>>>>> >
>>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> >> If the data file is same then it should have similar distribution of
>>>>> >> keys.
>>>>> >> Few queries-
>>>>> >>
>>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>>>> >> Program being submitted?
>>>>> >>
>>>>> >> Also, can you please share the details of Spark Context,
>>>>> Environment and
>>>>> >> Executors when you run via Scala program?
>>>>> >>
>>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>>> >> m.vijayaragh...@gmail.com> wrote:
>>>>> >>
>>>>> >>> Hello All,
>>>>> >>>
>>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>>> cluster (1
>>>>> >>> master and 2 worker nodes).
>>>>> >>>
>>>>> >>> val u =
>>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>>> (y.toInt,
>>>>> >>> x.toInt) } }).partitionBy(new
>>>>> HashPartitioner(8)).setName("u").persist()
>>>>> >>>
>>>>> >>> u.count()
>>>>> >>>
>>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>>> across
>>>>> >>> the
>>>>> >>> two worker nodes. But if we put this in a scala program and run
>>>>> it, then
>>>>> >>> all the data goes to only one node. We have run it multiple times,
>>>>> but
>>>>> >>> this
>>>>> >>> behavior does not change. This seems strange.
>>>>> >>>
>>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>>> >>>
>>>>> >>> Thanks in advance.
>>>>> >>>
>>>>> >>> Regards,
>>>>> >>> Raghava.
>>>>> >>>
>>>>> >>
>>>>> >>
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Regards,
>>>>> > Raghava
>>>>> > http://raghavam.github.io
>>>>> >
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Mike
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Raghava
>>>> http://raghavam.github.io
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>


-- 
Best Regards

Jeff Zhang

Reply via email to