Re: Joining a compressed ORC table with a non compressed text table

2016-06-29 Thread Michael Segel
Hi, 

I’m not sure I understand your initial question… 

Depending on the compression algo, you may or may not be able to split the 
file. 
So if its not splittable, you have a single long running thread. 

My guess is that you end up with a very long single partition. 
If so, if you repartition, you may end up seeing better performance in the 
join. 

I see that you’re using a hive context. 

Have you tried to manually do this using just data frames and compare the DAG 
to the SQL DAG? 

HTH

-Mike

> On Jun 29, 2016, at 9:14 AM, Mich Talebzadeh  
> wrote:
> 
> Hi all,
> 
> It finished in 2 hours 18 minutes!
> 
> Started at
> [29/06/2016 10:25:27.27]
> [148]
> [148]
> [148]
> [148]
> [148]
> Finished at
> [29/06/2016 12:43:33.33]
> 
> I need to dig in more. 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 29 June 2016 at 10:42, Mich Talebzadeh  > wrote:
> Focusing on Spark job, as I mentioned before Spark is running in local mode 
> with 8GB of memory for both the driver and executor memory.
> 
> However, I still see this enormous Duration time which indicates something is 
> wrong badly!
> 
> Also I got rid of groupBy
> 
>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>   val rs = s2.join(s,"prod_id").sort(desc("prod_id")).take(5).foreach(println)
> 
> 
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 29 June 2016 at 10:18, Jörn Franke  > wrote:
> 
> I think the TEZ engine is much more maintained with respect to optimizations 
> related to Orc , hive , vectorizing, querying than the mr engine. It will be 
> definitely better to use it.
> Mr is also deprecated in hive 2.0.
> For me it does not make sense to use mr with hive larger than 1.1.
> 
> As I said, order by might be inefficient to use (not sure if this has 
> changed). You may want to use sort by.
> 
> That being said there are many optimizations methods.
> 
> On 29 Jun 2016, at 00:27, Mich Talebzadeh  > wrote:
> 
>> That is a good point.
>> 
>> The ORC table property is as follows
>> 
>> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>> "orc.stripe.size"="268435456",
>> "orc.row.index.stride"="1")
>> 
>> which puts each stripe at 256MB
>> 
>> Just to clarify this is spark running on Hive tables. I don't think the use 
>> of TEZ, MR or Spark as execution engines is going to make any difference?
>> 
>> This is the same query with Hive on MR
>> 
>> select a.prod_id from sales2 a, sales_staging b where a.prod_id = b.prod_id 
>> order by a.prod_id;
>> 
>> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
>> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 7.32 
>> sec
>> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU 
>> 18.21 sec
>> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU 
>> 22.34 sec
>> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU 
>> 30.33 sec
>> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU 
>> 33.45 sec
>> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU 37.5 
>> sec
>> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU 42.0 
>> sec
>> 2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU 
>> 45.62 sec
>> 2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU 
>> 49.69 sec
>> 2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU 
>> 52.92 sec
>> 2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU 
>> 56.78 sec
>> 2016-06-28 

Re: Joining a compressed ORC table with a non compressed text table

2016-06-29 Thread Jörn Franke
Does the same happen if all the tables are in ORC format? It might be just 
simpler to convert the text table to ORC since it is rather small

> On 29 Jun 2016, at 15:14, Mich Talebzadeh  wrote:
> 
> Hi all,
> 
> It finished in 2 hours 18 minutes!
> 
> Started at
> [29/06/2016 10:25:27.27]
> [148]
> [148]
> [148]
> [148]
> [148]
> Finished at
> [29/06/2016 12:43:33.33]
> 
> I need to dig in more. 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
>> On 29 June 2016 at 10:42, Mich Talebzadeh  wrote:
>> Focusing on Spark job, as I mentioned before Spark is running in local mode 
>> with 8GB of memory for both the driver and executor memory.
>> 
>> However, I still see this enormous Duration time which indicates something 
>> is wrong badly!
>> 
>> Also I got rid of groupBy
>> 
>>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>>   val rs = 
>> s2.join(s,"prod_id").sort(desc("prod_id")).take(5).foreach(println)
>> 
>> 
>> 
>> 
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>>> On 29 June 2016 at 10:18, Jörn Franke  wrote:
>>> 
>>> I think the TEZ engine is much more maintained with respect to 
>>> optimizations related to Orc , hive , vectorizing, querying than the mr 
>>> engine. It will be definitely better to use it.
>>> Mr is also deprecated in hive 2.0.
>>> For me it does not make sense to use mr with hive larger than 1.1.
>>> 
>>> As I said, order by might be inefficient to use (not sure if this has 
>>> changed). You may want to use sort by.
>>> 
>>> That being said there are many optimizations methods.
>>> 
 On 29 Jun 2016, at 00:27, Mich Talebzadeh  
 wrote:
 
 That is a good point.
 
 The ORC table property is as follows
 
 TBLPROPERTIES ( "orc.compress"="SNAPPY",
 "orc.stripe.size"="268435456",
 "orc.row.index.stride"="1")
 
 which puts each stripe at 256MB
 
 Just to clarify this is spark running on Hive tables. I don't think the 
 use of TEZ, MR or Spark as execution engines is going to make any 
 difference?
 
 This is the same query with Hive on MR
 
 select a.prod_id from sales2 a, sales_staging b where a.prod_id = 
 b.prod_id order by a.prod_id;
 
 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 
 7.32 sec
 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU 
 18.21 sec
 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU 
 22.34 sec
 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU 
 30.33 sec
 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU 
 33.45 sec
 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU 
 37.5 sec
 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU 
 42.0 sec
 2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU 
 45.62 sec
 2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU 
 49.69 sec
 2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU 
 52.92 sec
 2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU 
 56.78 sec
 2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU 
 60.36 sec
 2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU 
 63.68 sec
 2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU 
 66.92 sec
 2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 
 70.18 sec
 2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 
 127.99 sec
 2016-06-28 23:25:57,494 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 
 134.64 sec
 2016-06-28 

Re: Joining a compressed ORC table with a non compressed text table

2016-06-29 Thread Jörn Franke

I think the TEZ engine is much more maintained with respect to optimizations 
related to Orc , hive , vectorizing, querying than the mr engine. It will be 
definitely better to use it.
Mr is also deprecated in hive 2.0.
For me it does not make sense to use mr with hive larger than 1.1.

As I said, order by might be inefficient to use (not sure if this has changed). 
You may want to use sort by.

That being said there are many optimizations methods.

> On 29 Jun 2016, at 00:27, Mich Talebzadeh  wrote:
> 
> That is a good point.
> 
> The ORC table property is as follows
> 
> TBLPROPERTIES ( "orc.compress"="SNAPPY",
> "orc.stripe.size"="268435456",
> "orc.row.index.stride"="1")
> 
> which puts each stripe at 256MB
> 
> Just to clarify this is spark running on Hive tables. I don't think the use 
> of TEZ, MR or Spark as execution engines is going to make any difference?
> 
> This is the same query with Hive on MR
> 
> select a.prod_id from sales2 a, sales_staging b where a.prod_id = b.prod_id 
> order by a.prod_id;
> 
> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 7.32 
> sec
> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU 18.21 
> sec
> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU 22.34 
> sec
> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU 30.33 
> sec
> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU 33.45 
> sec
> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU 37.5 
> sec
> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU 42.0 
> sec
> 2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU 45.62 
> sec
> 2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU 49.69 
> sec
> 2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU 52.92 
> sec
> 2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU 56.78 
> sec
> 2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU 60.36 
> sec
> 2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU 63.68 
> sec
> 2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU 66.92 
> sec
> 2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 
> 70.18 sec
> 2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 
> 127.99 sec
> 2016-06-28 23:25:57,494 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 
> 134.64 sec
> 2016-06-28 23:26:57,847 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 
> 141.01 sec
> 
> which basically sits at 67% all day
> 
> 
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
>> On 28 June 2016 at 23:07, Jörn Franke  wrote:
>> 
>> 
>> Bzip2 is splittable for text files.
>> 
>> Btw in Orc the question of splittable does not matter because each stripe is 
>> compressed individually.
>> 
>> Have you tried tez? As far as I recall (at least it was in the first version 
>> of Hive) mr uses for order by a single reducer which is a bottleneck.
>> 
>> Do you see some errors in the log file?
>> 
>>> On 28 Jun 2016, at 23:53, Mich Talebzadeh  wrote:
>>> 
>>> Hi,
>>> 
>>> 
>>> I have a simple join between table sales2 a compressed (snappy) ORC with 22 
>>> million rows and another simple table sales_staging under a million rows 
>>> stored as a text file with no compression.
>>> 
>>> The join is very simple
>>> 
>>>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>>>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>>> 
>>>   val rs = 
>>> s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)
>>> 
>>> 
>>> Now what is happening is it is sitting on SortMergeJoin operation on 
>>> ZippedPartitionRDD as shown in the DAG diagram below
>>> 
>>> 
>>> 
>>> 
>>> 
>>> And at this rate  only 10% is done and will take for ever to finish :(
>>> 
>>> Stage 3:==> (10 + 2) / 
>>> 200]
>>> 
>>> Ok I understand that zipped files cannot be broken into blocks and 
>>> operations on them cannot be parallelized.
>>> 
>>> Having said that what are the alternatives? Never use compression and live 
>>> with it. I emphasise that any operation on the compressed table itself is 
>>> pretty fast as it is a simple table scan. However, a join between two 
>>> tables 

Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Timur Shenkao
Hi, guys!

As far as I remember, Spark does not use all peculiarities and
optimizations of ORC. Moreover, the possibility  to read ORC files appeared
not so long time ago in Spark.

So, despite "victorious" results announced in
http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/ ,
 there is a lot of "nuisances" like
https://issues.apache.org/jira/browse/SPARK-11087 or
https://issues.apache.org/jira/browse/SPARK-14286

May be, it would be useful
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization

On Wed, Jun 29, 2016 at 1:38 AM, Mich Talebzadeh 
wrote:

> This is what I am getting in the container log for mr
>
> 2016-06-28 23:25:53,808 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file: FS
> hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_task_tmp.-mr-10004/_tmp.00_0
> 2016-06-28 23:25:53,808 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: New Final Path: FS
> hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_tmp.-mr-10004/00_0
> 2016-06-28 23:25:53,836 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 1
> 2016-06-28 23:25:53,837 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 10
> 2016-06-28 23:25:53,837 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 100
> 2016-06-28 23:25:53,844 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1000
> 2016-06-28 23:25:53,875 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1
> 2016-06-28 23:25:53,954 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 10
> 2016-06-28 23:25:55,072 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 100
> 2016-06-28 23:26:56,236 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1000
> 2016-06-28 23:27:58,499 WARN [ResponseProcessor for block
> BP-1648199869-50.140.197.217-1462266926537:blk_1074784072_1043287]
> org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fields took
> 35556ms (threshold=3ms); ack: seqno: 6815 status: SUCCESS status:
> SUCCESS downstreamAckTimeNanos: 35566795000, targets: [
> 50.140.197.217:50010, 50.140.197.216:50010]
> 2016-06-28 23:31:38,437 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1
> 2016-06-28 23:35:27,631 WARN [ResponseProcessor for block
> BP-1648199869-50.140.197.217-1462266926537:blk_1074784086_1043301]
> org.apache.hadoop.hdfs.DFSClient: *Slow ReadProcessor read fields took
> 31118ms (threshold=3ms);* ack: seqno: 36303 status: SUCCESS status:
> SUCCESS downstreamAckTimeNanos: 31128701000, targets: [
> 50.140.197.217:50010, 50.140.197.216:50010]
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 28 June 2016 at 23:27, Mich Talebzadeh 
> wrote:
>
>> That is a good point.
>>
>> The ORC table property is as follows
>>
>> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>> "orc.stripe.size"="268435456",
>> "orc.row.index.stride"="1")
>>
>> which puts each stripe at 256MB
>>
>> Just to clarify this is spark running on Hive tables. I don't think the
>> use of TEZ, MR or Spark as execution engines is going to make any
>> difference?
>>
>> This is the same query with Hive on MR
>>
>> select a.prod_id from sales2 a, sales_staging b where a.prod_id =
>> b.prod_id order by a.prod_id;
>>
>> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
>> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
>> 7.32 sec
>> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
>> 18.21 sec
>> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
>> 22.34 sec
>> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU
>> 30.33 sec
>> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU
>> 33.45 sec
>> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU
>> 37.5 sec
>> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU
>> 42.0 sec
>> 

Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Mich Talebzadeh
This is what I am getting in the container log for mr

2016-06-28 23:25:53,808 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file: FS
hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_task_tmp.-mr-10004/_tmp.00_0
2016-06-28 23:25:53,808 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: New Final Path: FS
hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_tmp.-mr-10004/00_0
2016-06-28 23:25:53,836 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 1
2016-06-28 23:25:53,837 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
10
2016-06-28 23:25:53,837 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
100
2016-06-28 23:25:53,844 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
1000
2016-06-28 23:25:53,875 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
1
2016-06-28 23:25:53,954 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
10
2016-06-28 23:25:55,072 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
100
2016-06-28 23:26:56,236 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
1000
2016-06-28 23:27:58,499 WARN [ResponseProcessor for block
BP-1648199869-50.140.197.217-1462266926537:blk_1074784072_1043287]
org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fields took
35556ms (threshold=3ms); ack: seqno: 6815 status: SUCCESS status:
SUCCESS downstreamAckTimeNanos: 35566795000, targets: [50.140.197.217:50010,
50.140.197.216:50010]
2016-06-28 23:31:38,437 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
1
2016-06-28 23:35:27,631 WARN [ResponseProcessor for block
BP-1648199869-50.140.197.217-1462266926537:blk_1074784086_1043301]
org.apache.hadoop.hdfs.DFSClient: *Slow ReadProcessor read fields took
31118ms (threshold=3ms);* ack: seqno: 36303 status: SUCCESS status:
SUCCESS downstreamAckTimeNanos: 31128701000, targets: [50.140.197.217:50010,
50.140.197.216:50010]




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 28 June 2016 at 23:27, Mich Talebzadeh  wrote:

> That is a good point.
>
> The ORC table property is as follows
>
> TBLPROPERTIES ( "orc.compress"="SNAPPY",
> "orc.stripe.size"="268435456",
> "orc.row.index.stride"="1")
>
> which puts each stripe at 256MB
>
> Just to clarify this is spark running on Hive tables. I don't think the
> use of TEZ, MR or Spark as execution engines is going to make any
> difference?
>
> This is the same query with Hive on MR
>
> select a.prod_id from sales2 a, sales_staging b where a.prod_id =
> b.prod_id order by a.prod_id;
>
> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
> 7.32 sec
> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
> 18.21 sec
> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
> 22.34 sec
> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU
> 30.33 sec
> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU
> 33.45 sec
> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU
> 37.5 sec
> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU
> 42.0 sec
> 2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU
> 45.62 sec
> 2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU
> 49.69 sec
> 2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU
> 52.92 sec
> 2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU
> 56.78 sec
> 2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU
> 60.36 sec
> 2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU
> 63.68 sec
> 2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU
> 66.92 sec
> 2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
> 70.18 sec
> 2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
> 127.99 sec
> 2016-06-28 23:25:57,494 Stage-1 

Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Mich Talebzadeh
That is a good point.

The ORC table property is as follows

TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="1")

which puts each stripe at 256MB

Just to clarify this is spark running on Hive tables. I don't think the use
of TEZ, MR or Spark as execution engines is going to make any difference?

This is the same query with Hive on MR

select a.prod_id from sales2 a, sales_staging b where a.prod_id = b.prod_id
order by a.prod_id;

2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
7.32 sec
2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
18.21 sec
2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
22.34 sec
2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU
30.33 sec
2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU
33.45 sec
2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU
37.5 sec
2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU
42.0 sec
2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU
45.62 sec
2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU
49.69 sec
2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU
52.92 sec
2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU
56.78 sec
2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU
60.36 sec
2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU
63.68 sec
2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU
66.92 sec
2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
70.18 sec
2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
127.99 sec
2016-06-28 23:25:57,494 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU
134.64 sec
2016-06-28 23:26:57,847 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU
141.01 sec

which basically sits at 67% all day





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 28 June 2016 at 23:07, Jörn Franke  wrote:

>
>
> Bzip2 is splittable for text files.
>
> Btw in Orc the question of splittable does not matter because each stripe
> is compressed individually.
>
> Have you tried tez? As far as I recall (at least it was in the first
> version of Hive) mr uses for order by a single reducer which is a
> bottleneck.
>
> Do you see some errors in the log file?
>
> On 28 Jun 2016, at 23:53, Mich Talebzadeh 
> wrote:
>
> Hi,
>
>
> I have a simple join between table sales2 a compressed (snappy) ORC with
> 22 million rows and another simple table sales_staging under a million rows
> stored as a text file with no compression.
>
> The join is very simple
>
>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>
>   val rs =
> s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)
>
>
> Now what is happening is it is sitting on SortMergeJoin operation
> on ZippedPartitionRDD as shown in the DAG diagram below
>
>
> 
>
>
> And at this rate  only 10% is done and will take for ever to finish :(
>
> Stage 3:==> (10 + 2) /
> 200]
>
> Ok I understand that zipped files cannot be broken into blocks and
> operations on them cannot be parallelized.
>
> Having said that what are the alternatives? Never use compression and live
> with it. I emphasise that any operation on the compressed table itself is
> pretty fast as it is a simple table scan. However, a join between two
> tables on a column as above suggests seems to be problematic?
>
> Thanks
>
> P.S. the same is happening using Hive with MR
>
> select a.prod_id from sales2 a inner join sales_staging b on a.prod_id =
> b.prod_id order by a.prod_id;
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no 

Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Jörn Franke


Bzip2 is splittable for text files.

Btw in Orc the question of splittable does not matter because each stripe is 
compressed individually.

Have you tried tez? As far as I recall (at least it was in the first version of 
Hive) mr uses for order by a single reducer which is a bottleneck.

Do you see some errors in the log file?

> On 28 Jun 2016, at 23:53, Mich Talebzadeh  wrote:
> 
> Hi,
> 
> 
> I have a simple join between table sales2 a compressed (snappy) ORC with 22 
> million rows and another simple table sales_staging under a million rows 
> stored as a text file with no compression.
> 
> The join is very simple
> 
>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>   val s = HiveContext.table("sales_staging").select("PROD_ID")
> 
>   val rs = 
> s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)
> 
> 
> Now what is happening is it is sitting on SortMergeJoin operation on 
> ZippedPartitionRDD as shown in the DAG diagram below
> 
> 
> 
> 
> 
> And at this rate  only 10% is done and will take for ever to finish :(
> 
> Stage 3:==> (10 + 2) / 
> 200]
> 
> Ok I understand that zipped files cannot be broken into blocks and operations 
> on them cannot be parallelized.
> 
> Having said that what are the alternatives? Never use compression and live 
> with it. I emphasise that any operation on the compressed table itself is 
> pretty fast as it is a simple table scan. However, a join between two tables 
> on a column as above suggests seems to be problematic?
> 
> Thanks
> 
> P.S. the same is happening using Hive with MR
> 
> select a.prod_id from sales2 a inner join sales_staging b on a.prod_id = 
> b.prod_id order by a.prod_id;
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Mich Talebzadeh
Hi,


I have a simple join between table sales2 a compressed (snappy) ORC with 22
million rows and another simple table sales_staging under a million rows
stored as a text file with no compression.

The join is very simple

  val s2 = HiveContext.table("sales2").select("PROD_ID")
  val s = HiveContext.table("sales_staging").select("PROD_ID")

  val rs =
s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)


Now what is happening is it is sitting on SortMergeJoin operation
on ZippedPartitionRDD as shown in the DAG diagram below


[image: Inline images 1]


And at this rate  only 10% is done and will take for ever to finish :(

Stage 3:==> (10 + 2) /
200]

Ok I understand that zipped files cannot be broken into blocks and
operations on them cannot be parallelized.

Having said that what are the alternatives? Never use compression and live
with it. I emphasise that any operation on the compressed table itself is
pretty fast as it is a simple table scan. However, a join between two
tables on a column as above suggests seems to be problematic?

Thanks

P.S. the same is happening using Hive with MR

select a.prod_id from sales2 a inner join sales_staging b on a.prod_id =
b.prod_id order by a.prod_id;

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.