Re: Aggregated table larger than expected

2016-06-29 Thread Matt Olson
In case someone else encounters this issue, it looks like this was due to
encoding differences between the hourly and daily table. The hourly table
often had the same values stored consecutively for certain columns, but the
group by on multiple columns caused them to be shuffled around to different
reducers and therefore written to different files when loaded to the daily
table.

This meant that for some columns the daily table was more likely to have a
larger percentage of distinct values stored consecutively, whereas
identical values in the hourly table tended to be clumped together more.
The latter case is more conducive to storing compactly.


On Thu, Jun 23, 2016 at 2:37 PM, Matt Olson  wrote:

> Hi,
>
> I am working with an hourly table and a daily table in Hive 1.0.1. Both
> tables have the same schema except that the hourly table is partitioned by
> dt and hour, but the daily table is partitioned only by dt. At the end of
> each day, the records from the hourly table are aggregated into the daily
> table, but we do a group by to remove some duplicate records.
>
> There are 1,363,106,822 total rows in the hourly table on 2016-06-20, and
> 1,300,287,508 rows in the daily table since some are dropped in the group
> by. However, the total size of all files in the hourly table for that date
> is 135.9 GB, but the total size of files in the daily table is 158.8 GB.
> I'm wondering why the daily table would be significantly larger, since it
> has fewer records but all the same information for the records it does
> have. Both tables are stored as RCFile and
> use com.hadoop.compression.lzo.LzoCodec for compression. However, the
> hourly table contains about 66000 small files, whereas the daily one
> contains 494.
>
> If I remove the group by and max functions from the query and just insert
> all the records for 2016-06-20 from the hourly table into the daily table
> (so the job becomes map-only), the daily table turns out to be smaller than
> the hourly. It seems like the introduction of the group by/reduce phase is
> causing the output to be larger somehow. I have also tried storing the
> daily table as ORC rather than RCFile, since ORC is more space-efficient.
> The total size is 147.2 GB, so smaller than the RCFile version but still
> larger than the hourly table.
>
> I've attached the query inserting into the daily table, as well as the
> explain output of the query.
>
> Thanks for any help,
> Matt
>


RE: Implementing a custom StorageHandler

2016-06-29 Thread Lavelle, Shawn
I don’t have answers for you, except for #1 – mapreduce are the new classes in 
Hadoop, from my understanding.  They’ve been out for a while, but the Hive 
storage handler API hasn’t been updated to make use of them.  Which leads me to 
my very related question: When might hive provide a storage handler interface 
that uses the new classes, and if not, why not?

  Thanks,

~ Shawn M Lavelle

From: Long, Andrew [mailto:loand...@amazon.com]
Sent: Monday, June 27, 2016 5:59 PM
To: user 
Subject: Implementing a custom StorageHandler

Hello everyone,

I’m in the process of implementing a custom StorageHandler and I had some 
questions.


1)  What is the difference between org.apache.Hadoop.mapred.InputFormat and 
org.apache.hadoop.mapreduce.InputFormat?

2)  How is numSpits calculated in 
org.apache.Hadoop.mapred.InputFormat.getSplits(JobConf job, int numSplits)?

3)  Is there a way to enforce a maximum number of splits?  What would 
happen if I ignore numSplits and just returned an array of splits that was the 
actual maximum number of splits?

4)  How is InputSplit.getLocations() used?  If I’m accessing non hfds 
resources should what should I return?  Currently I’m just returning an empty 
array.

Thanks for your time,
Andrew Long


RE: Query Performance Issue : Group By and Distinct and load on reducer

2016-06-29 Thread Markovitz, Dudu
1.
This is strange.
The negative numbers are due to overflow of the ‘int’ type, but for that reason 
exactly I’ve casted the expressions in my code to ‘bigint’.
I’ve tested this code before sending it to you and it worked fine, returning 
results that are beyond the range of the ‘int’ type.

Please try this:

select  *
  ,(floor(r*100) + 1)  + (100L * (row_number () over (partition 
by (floor(r*100) + 1) order by null) - 1)  as ETL_ROW_ID

from(select *,rand() as r from INTER_ETL) as t
;

2.
Great

3.
Sorry, hadn’t had the time to test it (nor the change I’m going to suggest 
now…☺)
Please check if the following code works and if so, replace the ‘a’ subquery 
code with it.



select  a1.group_id

   ,sum (a2.cnt) - a1.cnt   as accum_rows



from   (select  abs(hash(MyCol1,MyCol2))%1000  as group_id

   ,count (*)  as cnt



fromINTER_ETL



group byabs(hash(MyCol1,MyCol2))%1000

)

as a1



cross join  (select abs(hash(MyCol1,MyCol2))%1000   as group_id

   ,count (*)   as cnt



fromINTER_ETL



group byabs(hash(MyCol1,MyCol2))%1000

)

as a2



where   a2.group_id <= a1.group_id



group bya1.group_id

;


From: @Sanjiv Singh [mailto:sanjiv.is...@gmail.com]
Sent: Wednesday, June 29, 2016 10:55 PM
To: Markovitz, Dudu 
Cc: user@hive.apache.org
Subject: Re: Query Performance Issue : Group By and Distinct and load on reducer

Hi Dudu,

I tried the same on same table which has 6357592675 rows. See response of all 
three.


I tried 1st one , its giving duplicates for rows.

> CREATE TEMPORARY TABLE INTER_ETL_T AS
select  *
,cast (floor(r*100) + 1 as bigint) + (100 * (row_number () over 
(partition by cast (floor(r*100) + 1 as bigint) order by null) - 1))  as 
ROW_NUM
from(select *,rand() as r from INTER_ETL) as t ;


> select ROW_NUM, count(*) from INTER_ETL_T by ROW_NUM having count(*) > 1 
> limit 10;

+--+--+--+
|ROW_NUM| _c1  |
+--+--+--+
| -2146932303  | 2|
| -2146924922  | 2|
| -2146922710  | 2|
| -2146901450  | 2|
| -2146897115  | 2|
| -2146874805  | 2|
| -2146869449  | 2|
| -2146865918  | 2|
| -2146864595  | 2|
| -2146857688  | 2|
+--+--+--+

On 2nd one, it is not giving any duplicate and was much faster than 
ROW_NUMBER() atleast.

numRows=6357592675, totalSize=405516934422, rawDataSize=399159341747


And on 3rd for consecutive number, query is not compatible to HIVE.

CREATE TEMPORARY TABLE INTER_ETL_T AS
select  *
,a.accum_rows + row_number () over (partition by 
abs(hash(t.m_d_key,t.s_g_key))%1 order by null) as ROW_NUM
fromINTER_ETL   as t
join(select abs(hash(m_d_key,s_g_key))%1   as group_id
,sum (count (*)) over (order by m_d_key,s_g_key rows between unbounded 
preceding and 1 preceding) - count(*)   as accum_rows
fromINTER_ETL
group byabs(hash(m_d_key,s_g_key))%1
) as a
on  a.group_id  = abs(hash(t.m_d_key,t.s_g_key))%1
;

Error :

Error: Error while compiling statement: FAILED: SemanticException End of a 
WindowFrame cannot be UNBOUNDED PRECEDING (state=42000,code=4)



Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Tue, Jun 28, 2016 at 6:16 PM, @Sanjiv Singh 
mailto:sanjiv.is...@gmail.com>> wrote:
thanks a lot.
let me give it a try.

Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Tue, Jun 28, 2016 at 5:32 PM, Markovitz, Dudu 
mailto:dmarkov...@paypal.com>> wrote:
There’s a distributed algorithm for windows function that is based on the ORDER 
BY clause rather than the PARTITION BY clause.
I doubt if is implemented in Hive, but it’s worth a shot.

select  *
   ,row_number () over (order by rand()) as ETL_ROW_ID
fromINTER_ETL
;

For unique, not consecutive values you can try this:

select  *
   ,cast (floor(r*100) + 1 as bigint) + (100 * (row_number () 
over (partition by cast (floor(r*100) + 1 as bigint) order by null) - 1))  
as ETL_ROW_ID

from(select *,rand() as r from INTER_ETL) as t
;

If you have in your table a column/combination of columns with unified 
distribution you can also do something like this:

select  *
   , (abs(hash(MyCol1,MyCol2))%100 + 1) + (row_number () over 
(partition by (abs(hash(MyCol1,MyCol2))%100 + 1) order by null) - 1) * 
100L  as ETL_ROW_ID

fromINTER_ETL
;

For consecutive values you can do something (ugly…) like this:

select  *
   ,a.accum_rows + row_number () over (partition by 
abs(hash(t.MyCol1,t.MyCol2))%1 order by null) as ETL_ROW_ID

fromINTER_ETL   a

Re: Query Performance Issue : Group By and Distinct and load on reducer

2016-06-29 Thread @Sanjiv Singh
Hi Dudu,

I tried the same on same table which has 6357592675 rows. See response of
all three.


*I tried 1st one , its giving duplicates for rows. *

> CREATE TEMPORARY TABLE INTER_ETL_T AS
> select  *
> ,cast (floor(r*100) + 1 as bigint) + (100 * (row_number () over
> (partition by cast (floor(r*100) + 1 as bigint) order by null) - 1))
>  as ROW_NUM
> from(select *,rand() as r from INTER_ETL) as t ;



> select ROW_NUM, count(*) from INTER_ETL_T by ROW_NUM having count(*) > 1
limit 10;

+--+--+--+
> |ROW_NUM| _c1  |
> +--+--+--+
> | -2146932303  | 2|
> | -2146924922  | 2|
> | -2146922710  | 2|
> | -2146901450  | 2|
> | -2146897115  | 2|
> | -2146874805  | 2|
> | -2146869449  | 2|
> | -2146865918  | 2|
> | -2146864595  | 2|
> | -2146857688  | 2|
> +--+--+--+


On 2nd one, it is not giving any duplicate and was much faster than
ROW_NUMBER() atleast.

numRows=6357592675, totalSize=405516934422, rawDataSize=399159341747


*And on 3rd for consecutive number, query is not compatible to HIVE.*

CREATE TEMPORARY TABLE INTER_ETL_T AS
> select  *
> ,a.accum_rows + row_number () over (partition by
> abs(hash(t.m_d_key,t.s_g_key))%1 order by null) as ROW_NUM
> fromINTER_ETL   as t
> join(select abs(hash(m_d_key,s_g_key))%1   as group_id
> ,sum (count (*)) over (order by m_d_key,s_g_key rows between unbounded
> preceding and 1 preceding) - count(*)   as accum_rows
> fromINTER_ETL
> group byabs(hash(m_d_key,s_g_key))%1
> ) as a
> on  a.group_id  = abs(hash(t.m_d_key,t.s_g_key))%1
> ;


Error :

Error: Error while compiling statement: FAILED: SemanticException End of a
> WindowFrame cannot be UNBOUNDED PRECEDING (state=42000,code=4)




Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Tue, Jun 28, 2016 at 6:16 PM, @Sanjiv Singh 
wrote:

> thanks a lot.
> let me give it a try.
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>
> On Tue, Jun 28, 2016 at 5:32 PM, Markovitz, Dudu 
> wrote:
>
>> There’s a distributed algorithm for windows function that is based on the
>> ORDER BY clause rather than the PARTITION BY clause.
>>
>> I doubt if is implemented in Hive, but it’s worth a shot.
>>
>>
>>
>> select  *
>>
>>,row_number () over (order by rand()) as ETL_ROW_ID
>>
>> fromINTER_ETL
>>
>> ;
>>
>>
>>
>> For unique, not consecutive values you can try this:
>>
>>
>>
>> select  *
>>
>>,cast (floor(r*100) + 1 as bigint) + (100 *
>> (row_number () over (partition by cast (floor(r*100) + 1 as bigint)
>> order by null) - 1))  as ETL_ROW_ID
>>
>>
>>
>> from(select *,rand() as r from INTER_ETL) as t
>>
>> ;
>>
>>
>>
>> If you have in your table a column/combination of columns with unified
>> distribution you can also do something like this:
>>
>>
>>
>> select  *
>>
>>, (abs(hash(MyCol1,MyCol2))%100 + 1) + (row_number ()
>> over (partition by (abs(hash(MyCol1,MyCol2))%100 + 1) order by null)
>> - 1) * 100L  as ETL_ROW_ID
>>
>>
>>
>> fromINTER_ETL
>>
>> ;
>>
>>
>>
>> For consecutive values you can do something (ugly…) like this:
>>
>>
>>
>> select  *
>>
>>,a.accum_rows + row_number () over (partition by
>> abs(hash(t.MyCol1,t.MyCol2))%1 order by null) as ETL_ROW_ID
>>
>>
>>
>> fromINTER_ETL   as t
>>
>>
>>
>> join(select
>> abs(hash(MyCol1,MyCol2))%1
>> as group_id
>>
>>,sum (count (*)) over (order by
>> MyCol1,MyCol2 rows between unbounded preceding and 1 preceding) -
>> count(*)   as accum_rows
>>
>>
>>
>> fromINTER_ETL
>>
>>
>>
>> group byabs(hash(MyCol1,MyCol2))%1
>>
>> )
>>
>> as a
>>
>>
>>
>> on  a.group_id  =
>> abs(hash(t.MyCol1,t.MyCol2))%1
>>
>>
>>
>> ;
>>
>>
>>
>>
>>
>>
>>
>> *From:* @Sanjiv Singh [mailto:sanjiv.is...@gmail.com]
>> *Sent:* Tuesday, June 28, 2016 11:52 PM
>>
>> *To:* Markovitz, Dudu 
>> *Cc:* user@hive.apache.org
>> *Subject:* Re: Query Performance Issue : Group By and Distinct and load
>> on reducer
>>
>>
>>
>> ETL_ROW_ID is to be consecutive number. I need to check if having unique
>> number would not break any logic.
>>
>>
>>
>> Considering unique number for ETL_ROW_ID column, what are optimum
>> options available?
>>
>> What id it has to be consecutive number only?
>>
>>
>>
>>
>>
>>
>> Regards
>> Sanjiv Singh
>> Mob :  +091 9990-447-339
>>
>>
>>
>> On Tue, Jun 28, 2016 at 4:17 PM, Markovitz, Dudu 
>> wrote:
>>
>> I’m guessing ETL_ROW_ID should be unique but not necessarily contain
>> only consecutive numbers?
>>
>>
>>
>> *From:* @Sanjiv Singh [mailto:sanjiv.is...@gmail.com]
>> *Sent:* Tuesday, June 28, 2016 10:57 PM
>> *To:* Markovitz, Dudu 
>> *Cc:* user@hive.apache.org
>> *Subject:* Re: Query Pe

Re: Hash table in map join - Hive

2016-06-29 Thread Ross Guth
Hi Gopal,

I saw the log files and the hash table information in it. Thanks.

Also, I enforced shuffle hash join. I had a couple of questions around it:

1. In the query plan, it still says Map Join Operator (Would have expected
it to be named as Reduce side operator).
2. The edges in this query plans were named as custom_simple_edge: Is this
the one pointing to the fact that sorting of mapper inputs are bypassed?
3. "hive.tez.auto.reducer.parallelism=true" is not taking effect for
shuffle hash join. With the same input tables, in merge join (Shuffle sort
merge join), it took 1009 reducers without auto reducer turned on and took
337 reducers in the other case. While in case of shuffle hash join, it is
not changing from 1009 to 337. Is there something else I need to do, for
getting this optimization feature on, in this case?

I had a few general questions too:
1. What does tez.auto.reducer.parallelism do -- Does it only reduce the
number of reducers based on the actual size of mapper output, or does it do
more. Because as mentioned above, in sort merge join case, if I try to
manually set the number of reduce tasks to 337 (using mapred.reduce.tasks
parameter), the execution time does not improve as much as when auto.red
param picks it by itself.

 2. I did not understand the intuition behind setting
hive.mapjoin.hybridgrace.hashtable=false (as mentioned in your previous
reply). Does hybrid grace hashtable mean the Hybrid Hybrid grace Hash join
implementation as mentioned here

.
If it is set to true, the hash table is created with multiple partitions.
If it is set to false, is the hash table created as a single hash table?
Isn't the true case better, as it can handle the case where the hash join
cannot fit in memory better. Also, there will be smaller lookups. I ran
both the cases (with gracehashtable set to true and false), and did not see
any difference in execution time  -- maybe because my input size was
considerably small in that case.

3. In general, map join  in cluster mode, are these the actual steps
followed in hive/tez:
 a. *Hash table generation: * Partitioned hash tables of the small table is
created across multiple containers. In each container, a part of the small
table is dealt with. And in each container, the hash table is built for
that part, in 16 partitions. If any partition cannot fit in memory, it is
spilled to disk (with only disk file and not match file, since there is no
matching with big table happening).
b. *Broadcast of hash table*: All the partitions of all the parts of mall
table, including the ones spilled in the disk are serialized and sent to
all the second map containers.
c. *Join operator*:  The big table is scanned in each second mapper,
against the entire hash table of small table, and result is got.
Where does the rebuilding of spilt hash table happen? Is it during second
map phase where join is happening with bigger table?


Apologies for the long list of questions. But knowing this would be very
helpful to me.

Thanks in advance,
Ross

On Mon, Jun 27, 2016 at 7:25 PM, Gopal Vijayaraghavan 
wrote:

>
> > 1. OOM condition -- I get the following error when I force a map join in
> >hive/tez with low container size and heap size:"
> >java.lang.OutOfMemoryError: Java heap space". I was wondering what is the
> >condition which leads to this error.
>
> You are not modifying the noconditionaltasksize to match the Xmx at all.
>
> hive.auto.convert.join.noconditionaltask.size=(Xmx - io.sort.mb)/3.0;
>
>
> > 2.  Shuffle Hash Join -- I am using hive 2.0.1. What is the way to force
> >this join implementation? Is there any documentation regarding the same?
>
> 
>
>
> For full-fledged speed-mode, do
>
> set hive.vectorized.execution.reduce.enabled=true;
> set hive.optimize.dynamic.partition.hashjoin=true;
> set hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled=true;
> set hive.mapjoin.hybridgrace.hashtable=false;
>
> > 3. Hash table size: I use "--hiveconf hive.root.logger=INFO,console" for
> >seeing logs. I do not see the hash table size in these logs.
>
> No, the hashtables are no longer built on the gateway nodes - that used to
> be a single point of failure when 20-25 usere are connected via the same
> box.
>
> The hashtable logs are in the task side (in this case, I would guess Map
> 2's logs would have it). The output is from a log like which looks like
>
> yarn logs -applicationId  | grep Map.*metrics
>
> > Map 1  300
> >37.11 65,710  1,039 15,000,000
> >15,000,000
>
>
> So you have 15 million keys going into a single hashtable? The broadcast
> output rows is fed into the hashtable on the other side.
>
> The map-join sort of runs out of steam after about ~4 million entries - I
> would guess for your scenario setting the noconditional size to 8388608
> (

Upgrading Metastore schema 2.0.0->2.1.0

2016-06-29 Thread Jose Rozanec
Hi all,

Upgrading DB schema from 2.0.0 to 2.1.0 is causing an error. Did anyone
experience similar issues?

Below we leave the command and stacktrace.

Thanks,

*./schematool -dbType mysql -upgradeSchemaFrom 2.0.0*
Starting upgrade metastore schema from version 2.0.0 to 2.1.0
Upgrade script upgrade-2.0.0-to-2.1.0.mysql.sql
Error: Duplicate key name 'CONSTRAINTS_PARENT_TABLE_ID_INDEX'
Query is : CREATE INDEX `CONSTRAINTS_PARENT_TABLE_ID_INDEX` ON
KEY_CONSTRAINTS (`PARENT_TBL_ID`) USING BTREE (state=42000,code=1061)
org.apache.hadoop.hive.metastore.HiveMetaException: Upgrade FAILED!
Metastore state would be inconsistent !!
Underlying cause: java.io.IOException : Schema script failed, errorcode 2
Use --verbose for detailed stacktrace.
*** schemaTool failed ***