No. We specify it as a configuration option to the spark-submit. Does that
make a difference?

Regards,
Raghava.


On Mon, Apr 18, 2016 at 9:56 AM, Sonal Goyal <sonalgoy...@gmail.com> wrote:

> Are you specifying your spark master in the scala program?
>
> Best Regards,
> Sonal
> Founder, Nube Technologies <http://www.nubetech.co>
> Reifier at Strata Hadoop World
> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
> Reifier at Spark Summit 2015
> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Mon, Apr 18, 2016 at 6:26 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Mike,
>>
>> We tried that. This map task is actually part of a larger set of
>> operations. I pointed out this map task since it involves partitionBy() and
>> we always use partitionBy() whenever partition-unaware shuffle operations
>> are performed (such as distinct). We in fact do not notice a change in the
>> distribution after several unrelated stages are executed and a significant
>> time has passed (nearly 10-15 minutes).
>>
>> I agree. We are not looking for partitions to go to specific nodes and
>> nor do we expect a uniform distribution of keys across the cluster. There
>> will be a skew. But it cannot be that all the data is on one node and
>> nothing on the other and no, the keys are not the same. They vary from 1 to
>> around 55000 (integers). What makes this strange is that it seems to work
>> fine on the spark shell (REPL).
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> A HashPartitioner will indeed partition based on the key, but you
>>> cannot know on *which* node that key will appear. Again, the RDD
>>> partitions will not necessarily be distributed evenly across your
>>> nodes because of the greedy scheduling of the first wave of tasks,
>>> particularly if those tasks have durations less than the initial
>>> executor delay. I recommend you look at your logs to verify if this is
>>> happening to you.
>>>
>>> Mike
>>>
>>> On 4/18/16, Anuj Kumar <anujs...@gmail.com> wrote:
>>> > Good point Mike +1
>>> >
>>> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>>> >
>>> >> When submitting a job with spark-submit, I've observed delays (up to
>>> >> 1--2 seconds) for the executors to respond to the driver in order to
>>> >> receive tasks in the first stage. The delay does not persist once the
>>> >> executors have been synchronized.
>>> >>
>>> >> When the tasks are very short, as may be your case (relatively small
>>> >> data and a simple map task like you have described), the 8 tasks in
>>> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> >> the second executor won't have responded to the master before the
>>> >> first 4 tasks on the first executor have completed.
>>> >>
>>> >> To see if this is the cause in your particular case, you could try the
>>> >> following to confirm:
>>> >>         1. Examine the starting times of the tasks alongside their
>>> >> executor
>>> >>         2. Make a "dummy" stage execute before your real stages to
>>> >> synchronize the executors by creating and materializing any random RDD
>>> >>         3. Make the tasks longer, i.e. with some silly computational
>>> >> work.
>>> >>
>>> >> Mike
>>> >>
>>> >>
>>> >> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>> >> > Yes its the same data.
>>> >> >
>>> >> > 1) The number of partitions are the same (8, which is an argument to
>>> >> > the
>>> >> > HashPartitioner). In the first case, these partitions are spread
>>> across
>>> >> > both the worker nodes. In the second case, all the partitions are on
>>> >> > the
>>> >> > same node.
>>> >> > 2) What resources would be of interest here? Scala shell takes the
>>> >> default
>>> >> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>> run
>>> >> the
>>> >> > scala-shell. For the scala program, we do set some configuration
>>> >> > options
>>> >> > such as driver memory (12GB), parallelism is set to 8 and we use
>>> Kryo
>>> >> > serializer.
>>> >> >
>>> >> > We are running this on Azure D3-v2 machines which have 4 cores and
>>> 14GB
>>> >> > RAM.1 executor runs on each worker node. Following configuration
>>> >> > options
>>> >> > are set for the scala program -- perhaps we should move it to the
>>> spark
>>> >> > config file.
>>> >> >
>>> >> > Driver memory and executor memory are set to 12GB
>>> >> > parallelism is set to 8
>>> >> > Kryo serializer is used
>>> >> > Number of retainedJobs and retainedStages has been increased to
>>> check
>>> >> them
>>> >> > in the UI.
>>> >> >
>>> >> > What information regarding Spark Context would be of interest here?
>>> >> >
>>> >> > Regards,
>>> >> > Raghava.
>>> >> >
>>> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
>>> >> > wrote:
>>> >> >
>>> >> >> If the data file is same then it should have similar distribution
>>> of
>>> >> >> keys.
>>> >> >> Few queries-
>>> >> >>
>>> >> >> 1. Did you compare the number of partitions in both the cases?
>>> >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> >> >> Program being submitted?
>>> >> >>
>>> >> >> Also, can you please share the details of Spark Context,
>>> Environment
>>> >> >> and
>>> >> >> Executors when you run via Scala program?
>>> >> >>
>>> >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> >> >> m.vijayaragh...@gmail.com> wrote:
>>> >> >>
>>> >> >>> Hello All,
>>> >> >>>
>>> >> >>> We are using HashPartitioner in the following way on a 3 node
>>> cluster
>>> >> (1
>>> >> >>> master and 2 worker nodes).
>>> >> >>>
>>> >> >>> val u =
>>> >> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> >> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>> >> >>> (y.toInt,
>>> >> >>> x.toInt) } }).partitionBy(new
>>> >> HashPartitioner(8)).setName("u").persist()
>>> >> >>>
>>> >> >>> u.count()
>>> >> >>>
>>> >> >>> If we run this from the spark shell, the data (52 MB) is split
>>> across
>>> >> >>> the
>>> >> >>> two worker nodes. But if we put this in a scala program and run
>>> it,
>>> >> then
>>> >> >>> all the data goes to only one node. We have run it multiple times,
>>> >> >>> but
>>> >> >>> this
>>> >> >>> behavior does not change. This seems strange.
>>> >> >>>
>>> >> >>> Is there some problem with the way we use HashPartitioner?
>>> >> >>>
>>> >> >>> Thanks in advance.
>>> >> >>>
>>> >> >>> Regards,
>>> >> >>> Raghava.
>>> >> >>>
>>> >> >>
>>> >> >>
>>> >> >
>>> >> >
>>> >> > --
>>> >> > Regards,
>>> >> > Raghava
>>> >> > http://raghavam.github.io
>>> >> >
>>> >>
>>> >>
>>> >> --
>>> >> Thanks,
>>> >> Mike
>>> >>
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io

Reply via email to