Are you specifying your spark master in the scala program?

Best Regards,
Sonal
Founder, Nube Technologies <http://www.nubetech.co>
Reifier at Strata Hadoop World <https://www.youtube.com/watch?v=eD3LkpPQIgM>
Reifier at Spark Summit 2015
<https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>

<http://in.linkedin.com/in/sonalgoyal>



On Mon, Apr 18, 2016 at 6:26 PM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Mike,
>
> We tried that. This map task is actually part of a larger set of
> operations. I pointed out this map task since it involves partitionBy() and
> we always use partitionBy() whenever partition-unaware shuffle operations
> are performed (such as distinct). We in fact do not notice a change in the
> distribution after several unrelated stages are executed and a significant
> time has passed (nearly 10-15 minutes).
>
> I agree. We are not looking for partitions to go to specific nodes and nor
> do we expect a uniform distribution of keys across the cluster. There will
> be a skew. But it cannot be that all the data is on one node and nothing on
> the other and no, the keys are not the same. They vary from 1 to around
> 55000 (integers). What makes this strange is that it seems to work fine on
> the spark shell (REPL).
>
> Regards,
> Raghava.
>
>
> On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> A HashPartitioner will indeed partition based on the key, but you
>> cannot know on *which* node that key will appear. Again, the RDD
>> partitions will not necessarily be distributed evenly across your
>> nodes because of the greedy scheduling of the first wave of tasks,
>> particularly if those tasks have durations less than the initial
>> executor delay. I recommend you look at your logs to verify if this is
>> happening to you.
>>
>> Mike
>>
>> On 4/18/16, Anuj Kumar <anujs...@gmail.com> wrote:
>> > Good point Mike +1
>> >
>> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>> >
>> >> When submitting a job with spark-submit, I've observed delays (up to
>> >> 1--2 seconds) for the executors to respond to the driver in order to
>> >> receive tasks in the first stage. The delay does not persist once the
>> >> executors have been synchronized.
>> >>
>> >> When the tasks are very short, as may be your case (relatively small
>> >> data and a simple map task like you have described), the 8 tasks in
>> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> >> the second executor won't have responded to the master before the
>> >> first 4 tasks on the first executor have completed.
>> >>
>> >> To see if this is the cause in your particular case, you could try the
>> >> following to confirm:
>> >>         1. Examine the starting times of the tasks alongside their
>> >> executor
>> >>         2. Make a "dummy" stage execute before your real stages to
>> >> synchronize the executors by creating and materializing any random RDD
>> >>         3. Make the tasks longer, i.e. with some silly computational
>> >> work.
>> >>
>> >> Mike
>> >>
>> >>
>> >> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>> >> > Yes its the same data.
>> >> >
>> >> > 1) The number of partitions are the same (8, which is an argument to
>> >> > the
>> >> > HashPartitioner). In the first case, these partitions are spread
>> across
>> >> > both the worker nodes. In the second case, all the partitions are on
>> >> > the
>> >> > same node.
>> >> > 2) What resources would be of interest here? Scala shell takes the
>> >> default
>> >> > parameters since we use "bin/spark-shell --master <master-URL>" to
>> run
>> >> the
>> >> > scala-shell. For the scala program, we do set some configuration
>> >> > options
>> >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> >> > serializer.
>> >> >
>> >> > We are running this on Azure D3-v2 machines which have 4 cores and
>> 14GB
>> >> > RAM.1 executor runs on each worker node. Following configuration
>> >> > options
>> >> > are set for the scala program -- perhaps we should move it to the
>> spark
>> >> > config file.
>> >> >
>> >> > Driver memory and executor memory are set to 12GB
>> >> > parallelism is set to 8
>> >> > Kryo serializer is used
>> >> > Number of retainedJobs and retainedStages has been increased to check
>> >> them
>> >> > in the UI.
>> >> >
>> >> > What information regarding Spark Context would be of interest here?
>> >> >
>> >> > Regards,
>> >> > Raghava.
>> >> >
>> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> If the data file is same then it should have similar distribution of
>> >> >> keys.
>> >> >> Few queries-
>> >> >>
>> >> >> 1. Did you compare the number of partitions in both the cases?
>> >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> >> Program being submitted?
>> >> >>
>> >> >> Also, can you please share the details of Spark Context, Environment
>> >> >> and
>> >> >> Executors when you run via Scala program?
>> >> >>
>> >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> >> >> m.vijayaragh...@gmail.com> wrote:
>> >> >>
>> >> >>> Hello All,
>> >> >>>
>> >> >>> We are using HashPartitioner in the following way on a 3 node
>> cluster
>> >> (1
>> >> >>> master and 2 worker nodes).
>> >> >>>
>> >> >>> val u =
>> >> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>> >> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>> >> >>> (y.toInt,
>> >> >>> x.toInt) } }).partitionBy(new
>> >> HashPartitioner(8)).setName("u").persist()
>> >> >>>
>> >> >>> u.count()
>> >> >>>
>> >> >>> If we run this from the spark shell, the data (52 MB) is split
>> across
>> >> >>> the
>> >> >>> two worker nodes. But if we put this in a scala program and run it,
>> >> then
>> >> >>> all the data goes to only one node. We have run it multiple times,
>> >> >>> but
>> >> >>> this
>> >> >>> behavior does not change. This seems strange.
>> >> >>>
>> >> >>> Is there some problem with the way we use HashPartitioner?
>> >> >>>
>> >> >>> Thanks in advance.
>> >> >>>
>> >> >>> Regards,
>> >> >>> Raghava.
>> >> >>>
>> >> >>
>> >> >>
>> >> >
>> >> >
>> >> > --
>> >> > Regards,
>> >> > Raghava
>> >> > http://raghavam.github.io
>> >> >
>> >>
>> >>
>> >> --
>> >> Thanks,
>> >> Mike
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

Reply via email to