Thank you. For now we plan to use spark-shell to submit jobs.

Regards,
Raghava.


On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:

> Glad to hear that the problem was solvable! I have not seen delays of this
> type for later stages in jobs run by spark-submit, but I do not think it
> impossible if your stage has no lineage dependence on other RDDs.
>
> I'm CC'ing the dev list to report of other users observing load imbalance
> caused by unusual initial task scheduling. I don't know of ways to avoid
> this other than creating a dummy task to synchronize the executors, but
> hopefully someone from there can suggest other possibilities.
>
> Mike
> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
> wrote:
>
>> Mike,
>>
>> It turns out the executor delay, as you mentioned, is the cause. After we
>> introduced a dummy stage, partitioning was working fine. Does this delay
>> happen during later stages as well? We noticed the same behavior
>> (partitioning happens on spark-shell but not through spark-submit) at a
>> later stage also.
>>
>> Apart from introducing a dummy stage or running it from spark-shell, is
>> there any other option to fix this?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> When submitting a job with spark-submit, I've observed delays (up to
>>> 1--2 seconds) for the executors to respond to the driver in order to
>>> receive tasks in the first stage. The delay does not persist once the
>>> executors have been synchronized.
>>>
>>> When the tasks are very short, as may be your case (relatively small
>>> data and a simple map task like you have described), the 8 tasks in
>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> the second executor won't have responded to the master before the
>>> first 4 tasks on the first executor have completed.
>>>
>>> To see if this is the cause in your particular case, you could try the
>>> following to confirm:
>>>         1. Examine the starting times of the tasks alongside their
>>> executor
>>>         2. Make a "dummy" stage execute before your real stages to
>>> synchronize the executors by creating and materializing any random RDD
>>>         3. Make the tasks longer, i.e. with some silly computational
>>> work.
>>>
>>> Mike
>>>
>>>
>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>> > Yes its the same data.
>>> >
>>> > 1) The number of partitions are the same (8, which is an argument to
>>> the
>>> > HashPartitioner). In the first case, these partitions are spread across
>>> > both the worker nodes. In the second case, all the partitions are on
>>> the
>>> > same node.
>>> > 2) What resources would be of interest here? Scala shell takes the
>>> default
>>> > parameters since we use "bin/spark-shell --master <master-URL>" to run
>>> the
>>> > scala-shell. For the scala program, we do set some configuration
>>> options
>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>> > serializer.
>>> >
>>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>>> > RAM.1 executor runs on each worker node. Following configuration
>>> options
>>> > are set for the scala program -- perhaps we should move it to the spark
>>> > config file.
>>> >
>>> > Driver memory and executor memory are set to 12GB
>>> > parallelism is set to 8
>>> > Kryo serializer is used
>>> > Number of retainedJobs and retainedStages has been increased to check
>>> them
>>> > in the UI.
>>> >
>>> > What information regarding Spark Context would be of interest here?
>>> >
>>> > Regards,
>>> > Raghava.
>>> >
>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
>>> wrote:
>>> >
>>> >> If the data file is same then it should have similar distribution of
>>> >> keys.
>>> >> Few queries-
>>> >>
>>> >> 1. Did you compare the number of partitions in both the cases?
>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> >> Program being submitted?
>>> >>
>>> >> Also, can you please share the details of Spark Context, Environment
>>> and
>>> >> Executors when you run via Scala program?
>>> >>
>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> >> m.vijayaragh...@gmail.com> wrote:
>>> >>
>>> >>> Hello All,
>>> >>>
>>> >>> We are using HashPartitioner in the following way on a 3 node
>>> cluster (1
>>> >>> master and 2 worker nodes).
>>> >>>
>>> >>> val u =
>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>> (y.toInt,
>>> >>> x.toInt) } }).partitionBy(new
>>> HashPartitioner(8)).setName("u").persist()
>>> >>>
>>> >>> u.count()
>>> >>>
>>> >>> If we run this from the spark shell, the data (52 MB) is split across
>>> >>> the
>>> >>> two worker nodes. But if we put this in a scala program and run it,
>>> then
>>> >>> all the data goes to only one node. We have run it multiple times,
>>> but
>>> >>> this
>>> >>> behavior does not change. This seems strange.
>>> >>>
>>> >>> Is there some problem with the way we use HashPartitioner?
>>> >>>
>>> >>> Thanks in advance.
>>> >>>
>>> >>> Regards,
>>> >>> Raghava.
>>> >>>
>>> >>
>>> >>
>>> >
>>> >
>>> > --
>>> > Regards,
>>> > Raghava
>>> > http://raghavam.github.io
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>


-- 
Regards,
Raghava
http://raghavam.github.io

Reply via email to