We are testing with 52MB, but it would go to 20GB and more later on. The
cluster size is also not static, we would be growing it. But the issue here
is the behavior of HashPartitioner -- from what I understand, it should be
partitioning the data based on the hash of the key irrespective of the RAM
size (which is more than adequate now). This behavior is different in
spark-shell and spark scala program.

We are not using YARN, its the stand alone version of Spark.

Regards,
Raghava.


On Mon, Apr 18, 2016 at 12:09 AM, Anuj Kumar <anujs...@gmail.com> wrote:

> Few params like- spark.task.cpus, spark.cores.max will help. Also, for
> 52MB of data you need not have 12GB allocated to executors. Better to
> assign 512MB or so and increase the number of executors per worker node.
> Try reducing that executor memory to 512MB or so for this case.
>
> On Mon, Apr 18, 2016 at 9:07 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Yes its the same data.
>>
>> 1) The number of partitions are the same (8, which is an argument to the
>> HashPartitioner). In the first case, these partitions are spread across
>> both the worker nodes. In the second case, all the partitions are on the
>> same node.
>> 2) What resources would be of interest here? Scala shell takes the
>> default parameters since we use "bin/spark-shell --master <master-URL>" to
>> run the scala-shell. For the scala program, we do set some configuration
>> options such as driver memory (12GB), parallelism is set to 8 and we use
>> Kryo serializer.
>>
>> We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> RAM.1 executor runs on each worker node. Following configuration options
>> are set for the scala program -- perhaps we should move it to the spark
>> config file.
>>
>> Driver memory and executor memory are set to 12GB
>> parallelism is set to 8
>> Kryo serializer is used
>> Number of retainedJobs and retainedStages has been increased to check
>> them in the UI.
>>
>> What information regarding Spark Context would be of interest here?
>>
>> Regards,
>> Raghava.
>>
>> On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> wrote:
>>
>>> If the data file is same then it should have similar distribution of
>>> keys. Few queries-
>>>
>>> 1. Did you compare the number of partitions in both the cases?
>>> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> Program being submitted?
>>>
>>> Also, can you please share the details of Spark Context, Environment and
>>> Executors when you run via Scala program?
>>>
>>> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> m.vijayaragh...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>>
>>>> We are using HashPartitioner in the following way on a 3 node cluster
>>>> (1 master and 2 worker nodes).
>>>>
>>>> val u =
>>>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>>>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>>>
>>>> u.count()
>>>>
>>>> If we run this from the spark shell, the data (52 MB) is split across
>>>> the two worker nodes. But if we put this in a scala program and run it,
>>>> then all the data goes to only one node. We have run it multiple times, but
>>>> this behavior does not change. This seems strange.
>>>>
>>>> Is there some problem with the way we use HashPartitioner?
>>>>
>>>> Thanks in advance.
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io

Reply via email to