Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread yeshwanth kumar
Hi Ayan,

, thanks for the explanation,
I am aware of compression codecs.

How does locality level set?
Is it done by Spark or yarn?

Please let me know,



Thanks,
Yesh




On Nov 22, 2016 5:13 PM, "ayan guha" <guha.a...@gmail.com> wrote:

Hi

RACK_LOCAL = Task running on the same rack but not on the same node where
data is
NODE_LOCAL = task and data is co-located. Probably you were looking for
this one?

GZIP - Read is through GZIP codec, but because it is non-splittable, so you
can have atmost 1 task reading a gzip file. Now, the content of gzip may be
across multiple node. Ex: GZIP file of say 1GB and block size is 256 MB (ie
4 blocks). Assume not all 4 blocks are on same data node.

When you start reading the gzip file, 1 task will be assigned. It will read
local blocks if available, and it will read remote blocks (streaming read).
While reading the stream, gzip codec will uncompress the data.

This is really is not a spark thing, but a hadoop input format
discussion

HTH?

On Wed, Nov 23, 2016 at 10:00 AM, yeshwanth kumar <yeshwant...@gmail.com>
wrote:

> Hi Ayan,
>
> we have  default rack topology.
>
>
>
> -Yeshwanth
> Can you Imagine what I would do if I could do all I can - Art of War
>
> On Tue, Nov 22, 2016 at 6:37 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Because snappy is not splittable, so single task makes sense.
>>
>> Are sure about rack topology? Ie 225 is in a different rack than 227 or
>> 228? What does your topology file says?
>> On 22 Nov 2016 10:14, "yeshwanth kumar" <yeshwant...@gmail.com> wrote:
>>
>>> Thanks for your reply,
>>>
>>> i can definitely change the underlying compression format.
>>> but i am trying to understand the Locality Level,
>>> why executor ran on a different node, where the blocks are not present,
>>> when Locality Level is RACK_LOCAL
>>>
>>> can you shed some light on this.
>>>
>>>
>>> Thanks,
>>> Yesh
>>>
>>>
>>> -Yeshwanth
>>> Can you Imagine what I would do if I could do all I can - Art of War
>>>
>>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke <jornfra...@gmail.com>
>>> wrote:
>>>
>>>> Use as a format orc, parquet or avro because they support any
>>>> compression type with parallel processing. Alternatively split your file in
>>>> several smaller ones. Another alternative would be bzip2 (but slower in
>>>> general) or Lzo (usually it is not included by default in many
>>>> distributions).
>>>>
>>>> On 21 Nov 2016, at 23:17, yeshwanth kumar <yeshwant...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> we are running Hive on Spark, we have an external table over snappy
>>>> compressed csv file of size 917.4 M
>>>> HDFS block size is set to 256 MB
>>>>
>>>> as per my Understanding, if i run a query over that external table , it
>>>> should launch 4 tasks. one for each block.
>>>> but i am seeing one executor and one task processing all the file.
>>>>
>>>> trying to understand the reason behind,
>>>>
>>>> i went one step further to understand the block locality
>>>> when i get the block locations for that file, i found
>>>>
>>>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
>>>> 4a8f-be48-b0953fdaad37,DISK],
>>>>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
>>>> 4eb8-8183-8d8ff5f24115,DISK],
>>>>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
>>>> 43f8-91c9-d8517e68414a,DISK]]
>>>>
>>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
>>>> 845-b043-8b91ae4017c0,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
>>>> 89b-8209-4307f3296211,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
>>>> 5fd-ae0f-cc6eb268b0d2,DISK]]
>>>>
>>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
>>>> 601-8070-f6c5da840e09,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
>>>> 94d-87ee-bcfff2182a96,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
>>>> 8d3-b858-a023b5c44e9c,DISK]
>>>>
>>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
>>>> 98c-a487-5ce6aaa66f48,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
>>>> e20-a360-e7cdad5dacc3,DISK],
>>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
>>>> c8f-8a13-7be37ce769c9,DISK]]
>>>>
>>>> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
>>>> task
>>>>
>>>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
>>>> 10.11.0.228, because these 2 nodes has all the four blocks needed for
>>>> computation
>>>> but the executor is running in 10.11.0.225
>>>>
>>>> my theory is not applying anywhere.
>>>>
>>>> please help me in understanding how spark/yarn calculates number of
>>>> executors/tasks.
>>>>
>>>> Thanks,
>>>> -Yeshwanth
>>>>
>>>>
>>>
>


-- 
Best Regards,
Ayan Guha


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-22 Thread yeshwanth kumar
Hi Ayan,

we have  default rack topology.



-Yeshwanth
Can you Imagine what I would do if I could do all I can - Art of War

On Tue, Nov 22, 2016 at 6:37 AM, ayan guha <guha.a...@gmail.com> wrote:

> Because snappy is not splittable, so single task makes sense.
>
> Are sure about rack topology? Ie 225 is in a different rack than 227 or
> 228? What does your topology file says?
> On 22 Nov 2016 10:14, "yeshwanth kumar" <yeshwant...@gmail.com> wrote:
>
>> Thanks for your reply,
>>
>> i can definitely change the underlying compression format.
>> but i am trying to understand the Locality Level,
>> why executor ran on a different node, where the blocks are not present,
>> when Locality Level is RACK_LOCAL
>>
>> can you shed some light on this.
>>
>>
>> Thanks,
>> Yesh
>>
>>
>> -Yeshwanth
>> Can you Imagine what I would do if I could do all I can - Art of War
>>
>> On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke <jornfra...@gmail.com>
>> wrote:
>>
>>> Use as a format orc, parquet or avro because they support any
>>> compression type with parallel processing. Alternatively split your file in
>>> several smaller ones. Another alternative would be bzip2 (but slower in
>>> general) or Lzo (usually it is not included by default in many
>>> distributions).
>>>
>>> On 21 Nov 2016, at 23:17, yeshwanth kumar <yeshwant...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> we are running Hive on Spark, we have an external table over snappy
>>> compressed csv file of size 917.4 M
>>> HDFS block size is set to 256 MB
>>>
>>> as per my Understanding, if i run a query over that external table , it
>>> should launch 4 tasks. one for each block.
>>> but i am seeing one executor and one task processing all the file.
>>>
>>> trying to understand the reason behind,
>>>
>>> i went one step further to understand the block locality
>>> when i get the block locations for that file, i found
>>>
>>> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-48e1-
>>> 4a8f-be48-b0953fdaad37,DISK],
>>>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-ce0c-
>>> 4eb8-8183-8d8ff5f24115,DISK],
>>>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-b030-
>>> 43f8-91c9-d8517e68414a,DISK]]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-4
>>> 845-b043-8b91ae4017c0,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-4
>>> 89b-8209-4307f3296211,DISK],
>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-4
>>> 5fd-ae0f-cc6eb268b0d2,DISK]]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-4
>>> 601-8070-f6c5da840e09,DISK],
>>> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-4
>>> 94d-87ee-bcfff2182a96,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-4
>>> 8d3-b858-a023b5c44e9c,DISK]
>>>
>>> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-4
>>> 98c-a487-5ce6aaa66f48,DISK],
>>> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-4
>>> e20-a360-e7cdad5dacc3,DISK],
>>> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-4
>>> c8f-8a13-7be37ce769c9,DISK]]
>>>
>>> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that
>>> task
>>>
>>> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
>>> 10.11.0.228, because these 2 nodes has all the four blocks needed for
>>> computation
>>> but the executor is running in 10.11.0.225
>>>
>>> my theory is not applying anywhere.
>>>
>>> please help me in understanding how spark/yarn calculates number of
>>> executors/tasks.
>>>
>>> Thanks,
>>> -Yeshwanth
>>>
>>>
>>


Re: RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread yeshwanth kumar
Thanks for your reply,

i can definitely change the underlying compression format.
but i am trying to understand the Locality Level,
why executor ran on a different node, where the blocks are not present,
when Locality Level is RACK_LOCAL

can you shed some light on this.


Thanks,
Yesh


-Yeshwanth
Can you Imagine what I would do if I could do all I can - Art of War

On Mon, Nov 21, 2016 at 4:59 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Use as a format orc, parquet or avro because they support any compression
> type with parallel processing. Alternatively split your file in several
> smaller ones. Another alternative would be bzip2 (but slower in general) or
> Lzo (usually it is not included by default in many distributions).
>
> On 21 Nov 2016, at 23:17, yeshwanth kumar <yeshwant...@gmail.com> wrote:
>
> Hi,
>
> we are running Hive on Spark, we have an external table over snappy
> compressed csv file of size 917.4 M
> HDFS block size is set to 256 MB
>
> as per my Understanding, if i run a query over that external table , it
> should launch 4 tasks. one for each block.
> but i am seeing one executor and one task processing all the file.
>
> trying to understand the reason behind,
>
> i went one step further to understand the block locality
> when i get the block locations for that file, i found
>
> [DatanodeInfoWithStorage[10.11.0.226:50010,DS-bf39d33d-
> 48e1-4a8f-be48-b0953fdaad37,DISK],
>  DatanodeInfoWithStorage[10.11.0.227:50010,DS-a760c1c8-
> ce0c-4eb8-8183-8d8ff5f24115,DISK],
>  DatanodeInfoWithStorage[10.11.0.228:50010,DS-0e5427e2-
> b030-43f8-91c9-d8517e68414a,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f50ddf2f-b827-
> 4845-b043-8b91ae4017c0,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-e8c9785f-c352-
> 489b-8209-4307f3296211,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-6f6a3ffd-334b-
> 45fd-ae0f-cc6eb268b0d2,DISK]]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-f8bea6a8-a433-
> 4601-8070-f6c5da840e09,DISK],
> DatanodeInfoWithStorage[10.11.0.227:50010,DS-8aa3f249-790e-
> 494d-87ee-bcfff2182a96,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-d06714f4-2fbb-
> 48d3-b858-a023b5c44e9c,DISK]
>
> DatanodeInfoWithStorage[10.11.0.226:50010,DS-b3a00781-c6bd-
> 498c-a487-5ce6aaa66f48,DISK],
> DatanodeInfoWithStorage[10.11.0.228:50010,DS-fa5aa339-e266-
> 4e20-a360-e7cdad5dacc3,DISK],
> DatanodeInfoWithStorage[10.11.0.225:50010,DS-9d597d3f-cd4f-
> 4c8f-8a13-7be37ce769c9,DISK]]
>
> and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task
>
> if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
> 10.11.0.228, because these 2 nodes has all the four blocks needed for
> computation
> but the executor is running in 10.11.0.225
>
> my theory is not applying anywhere.
>
> please help me in understanding how spark/yarn calculates number of
> executors/tasks.
>
> Thanks,
> -Yeshwanth
>
>


RDD Partitions on HDFS file in Hive on Spark Query

2016-11-21 Thread yeshwanth kumar
Hi,

we are running Hive on Spark, we have an external table over snappy
compressed csv file of size 917.4 M
HDFS block size is set to 256 MB

as per my Understanding, if i run a query over that external table , it
should launch 4 tasks. one for each block.
but i am seeing one executor and one task processing all the file.

trying to understand the reason behind,

i went one step further to understand the block locality
when i get the block locations for that file, i found

[DatanodeInfoWithStorage[10.11.0.226:50010
,DS-bf39d33d-48e1-4a8f-be48-b0953fdaad37,DISK],
 DatanodeInfoWithStorage[10.11.0.227:50010
,DS-a760c1c8-ce0c-4eb8-8183-8d8ff5f24115,DISK],
 DatanodeInfoWithStorage[10.11.0.228:50010
,DS-0e5427e2-b030-43f8-91c9-d8517e68414a,DISK]]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-f50ddf2f-b827-4845-b043-8b91ae4017c0,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-e8c9785f-c352-489b-8209-4307f3296211,DISK],
DatanodeInfoWithStorage[10.11.0.225:50010
,DS-6f6a3ffd-334b-45fd-ae0f-cc6eb268b0d2,DISK]]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-f8bea6a8-a433-4601-8070-f6c5da840e09,DISK],
DatanodeInfoWithStorage[10.11.0.227:50010
,DS-8aa3f249-790e-494d-87ee-bcfff2182a96,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-d06714f4-2fbb-48d3-b858-a023b5c44e9c,DISK]

DatanodeInfoWithStorage[10.11.0.226:50010
,DS-b3a00781-c6bd-498c-a487-5ce6aaa66f48,DISK],
DatanodeInfoWithStorage[10.11.0.228:50010
,DS-fa5aa339-e266-4e20-a360-e7cdad5dacc3,DISK],
DatanodeInfoWithStorage[10.11.0.225:50010
,DS-9d597d3f-cd4f-4c8f-8a13-7be37ce769c9,DISK]]

and in the spark UI i see the Locality Level is  RACK_LOCAL. for that task

if it is RACK_LOCAL then it should run either in node 10.11.0.226 or
10.11.0.228, because these 2 nodes has all the four blocks needed for
computation
but the executor is running in 10.11.0.225

my theory is not applying anywhere.

please help me in understanding how spark/yarn calculates number of
executors/tasks.

Thanks,
-Yeshwanth


How to generate a sequential key in rdd across executors

2016-07-23 Thread yeshwanth kumar
Hi,

i am doing bulk load to hbase using spark,
in which i need to generate a sequential key for each record,
the key should be sequential across all the executors.

i tried zipwith index, didn't worked because zipwith index gives index per
executor not across all executors.

looking for some suggestions.


Thanks,
-Yeshwanth


Spark HBase bulk load using hfile format

2016-07-13 Thread yeshwanth kumar
Hi i am doing bulk load into HBase as HFileFormat, by
using saveAsNewAPIHadoopFile

when i try to write i am getting an exception

 java.io.IOException: Added a key not lexically larger than previous.

following is the code snippet

case class HBaseRow(rowKey: ImmutableBytesWritable, kv: KeyValue)

val kAvroDF = sqlContext.read.format("com.databricks.spark.avro").load(args(0))
val kRDD = kAvroDF.select("seqid", "mi", "moc", "FID", "WID").rdd
val trRDD = kRDD.map(a => preparePUT(a(1).asInstanceOf[String],
a(3).asInstanceOf[String]))
val kvRDD = trRDD.flatMap(a => a).map(a => (a.rowKey, a.kv))
saveAsHFile(kvRDD, args(1))


prepare put returns a list of HBaseRow( ImmutableBytesWritable,KeyValue)
sorted on KeyValue, where i do a flat map on the rdd and
prepare a RDD(ImmutableBytesWritable,KeyValue) and pass it to saveASHFile

does flatmap operation on RDD changes the sorted order??


can anyone provide me how to resolve this issue.

Thanks,
-Yeshwanth