Re: Hi, Hive People urgent question about [Distribute By] function

2015-10-22 Thread Gopal Vijayaraghavan

> When applying [Distribute By] on Hive to the framework, the function
>should be partitionByHash on Flink. This is to spread out all the rows
>distributed by a hash key from Object Class in Java.

Hive does not use the Object hashCode - the identityHashCode is
inconsistent, so Object.hashCode() .

ObjectInspectorUtils::hashCode() is the hashcode used by the DBY in hive
(SORT BY uses a Random number generator).

Cheers,
Gopal



Re: Hi, Hive People urgent question about [Distribute By] function

2015-10-22 Thread Philip Lee
Thanks for your help.

so do you think if we want the same result from Hive and Spark or the other
freamwork, how could we try this one ?
could you tell me in detail.

Regards,
Philip

On Thu, Oct 22, 2015 at 6:25 PM, Gopal Vijayaraghavan  wrote:

>
> > When applying [Distribute By] on Hive to the framework, the function
> >should be partitionByHash on Flink. This is to spread out all the rows
> >distributed by a hash key from Object Class in Java.
>
> Hive does not use the Object hashCode - the identityHashCode is
> inconsistent, so Object.hashCode() .
>
> ObjectInspectorUtils::hashCode() is the hashcode used by the DBY in hive
> (SORT BY uses a Random number generator).
>
> Cheers,
> Gopal
>
>


-- 

==

*Hae Joon Lee*


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==


Re: Hi, Hive People urgent question about [Distribute By] function

2015-10-22 Thread Gopal Vijayaraghavan

> so do you think if we want the same result from Hive and Spark or the
>other freamwork, how could we try this one ?

There's a special backwards compat slow codepath that gets triggered if
you do

set mapred.reduce.tasks=199; (or any number)

This will produce the exact same hash-code as the java hashcode for
Strings & Integers.

The bucket-id is determined by

(hashCode & Integer.MAX_VALUE) % numberOfBuckets

but this also triggers a non-stable sort on an entirely empty key, which
will shuffle the data so the output file's order bears no resemblance to
the input file's order.


Even with that setting, the only consistent layout produced by Hive is the
CLUSTER BY, which will sort on the same key used for distribution & uses
the java hashCode if the auto-parallelism is turned off by setting a fixed
reducer count.

Cheers,
Gopal




Re: Hi, Hive People urgent question about [Distribute By] function

2015-10-24 Thread Philip Lee
Hello, the same question about DISTRIBUTE BY on Hive.

Accorring to you, you do not use hashCode of Object class on DBY,
Distribute By.

I tried to understand how ObjectInspectorUtils works for distribution, but
it seemed it has a lot of Hive API. It is not much understnading.
I want to override partitionByHash function on Flink like the same way of
DBY on Hive.
I am working on implementing some benchmark system for these two system,
which could be contritbutino to Hive as well.

Could you tell me in detail how it works?
I am pretty sure if you do not user hashCode of Object class in Java, you
defined the partition function for DBY.

Regards,
Philip Lee


On Thu, Oct 22, 2015 at 7:13 PM, Gopal Vijayaraghavan 
wrote:

>
> > so do you think if we want the same result from Hive and Spark or the
> >other freamwork, how could we try this one ?
>
> There's a special backwards compat slow codepath that gets triggered if
> you do
>
> set mapred.reduce.tasks=199; (or any number)
>
> This will produce the exact same hash-code as the java hashcode for
> Strings & Integers.
>
> The bucket-id is determined by
>
> (hashCode & Integer.MAX_VALUE) % numberOfBuckets
>
> but this also triggers a non-stable sort on an entirely empty key, which
> will shuffle the data so the output file's order bears no resemblance to
> the input file's order.
>
>
> Even with that setting, the only consistent layout produced by Hive is the
> CLUSTER BY, which will sort on the same key used for distribution & uses
> the java hashCode if the auto-parallelism is turned off by setting a fixed
> reducer count.
>
> Cheers,
> Gopal
>
>
>


-- 

==

*Hae Joon Lee*


Now, in Germany,

M.S. Candidate, Interested in Distributed System, Iterative Processing

Dept. of Computer Science, Informatik in German, TUB

Technical University of Berlin


In Korea,

M.S. Candidate, Computer Architecture Laboratory

Dept. of Computer Science, KAIST


Rm# 4414 CS Dept. KAIST

373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)


Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea

==


Re: Hi, Hive People urgent question about [Distribute By] function

2015-10-27 Thread Philip Lee
Hello, the same question about DISTRIBUTE BY on Hive.

Accorring to you, you do not use hashCode of Object class on DBY,
Distribute By.

I tried to understand how ObjectInspectorUtils works for distribution, but
it seemed it has a lot of Hive API. It is not much understnading.
I want to override partitionByHash function on Flink like the same way of
DBY on Hive.
I am working on implementing some benchmark system for these two system,
which could be contritbutino to Hive as well.

Could you tell me in detail how it works?
I am pretty sure if you do not user hashCode of Object class in Java, you
defined the partition function for DBY.

On Sun, Oct 25, 2015 at 12:59 AM, Philip Lee  wrote:

> Hello, the same question about DISTRIBUTE BY on Hive.
>
> Accorring to you, you do not use hashCode of Object class on DBY,
> Distribute By.
>
> I tried to understand how ObjectInspectorUtils works for distribution,
> but it seemed it has a lot of Hive API. It is not much understnading.
> I want to override partitionByHash function on Flink like the same way of
> DBY on Hive.
> I am working on implementing some benchmark system for these two system,
> which could be contritbutino to Hive as well.
>
> Could you tell me in detail how it works?
> I am pretty sure if you do not user hashCode of Object class in Java, you
> defined the partition function for DBY.
>
> Regards,
> Philip Lee
>
>
> On Thu, Oct 22, 2015 at 7:13 PM, Gopal Vijayaraghavan 
> wrote:
>
>>
>> > so do you think if we want the same result from Hive and Spark or the
>> >other freamwork, how could we try this one ?
>>
>> There's a special backwards compat slow codepath that gets triggered if
>> you do
>>
>> set mapred.reduce.tasks=199; (or any number)
>>
>> This will produce the exact same hash-code as the java hashcode for
>> Strings & Integers.
>>
>> The bucket-id is determined by
>>
>> (hashCode & Integer.MAX_VALUE) % numberOfBuckets
>>
>> but this also triggers a non-stable sort on an entirely empty key, which
>> will shuffle the data so the output file's order bears no resemblance to
>> the input file's order.
>>
>>
>> Even with that setting, the only consistent layout produced by Hive is the
>> CLUSTER BY, which will sort on the same key used for distribution & uses
>> the java hashCode if the auto-parallelism is turned off by setting a fixed
>> reducer count.
>>
>> Cheers,
>> Gopal
>>
>>
>>
>
>
> --
>
> ==
>
> *Hae Joon Lee*
>
>
> Now, in Germany,
>
> M.S. Candidate, Interested in Distributed System, Iterative Processing
>
> Dept. of Computer Science, Informatik in German, TUB
>
> Technical University of Berlin
>
>
> In Korea,
>
> M.S. Candidate, Computer Architecture Laboratory
>
> Dept. of Computer Science, KAIST
>
>
> Rm# 4414 CS Dept. KAIST
>
> 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701)
>
>
> Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea
>
> ==
>


Re: Hi, Hive People urgent question about [Distribute By] function

2015-10-27 Thread Gopal Vijayaraghavan

> I want to override partitionByHash function on Flink like the same way
>of DBY on Hive.
> I am working on implementing some benchmark system for these two system,
>which could be contritbutino to Hive as well.

I would be very disappointed if Flink fails to outperform Hive with a
Distribute BY, because the hive version is about 5-10x slower than it can
be with Tez.

Mapreduce forces a full sort of the output data, so the Hive version will
be potentially O(N*LOG(N)) by default while Flink should be able to do
O(N).


Assuming you don't turn on any of the compatibility modes, the hashCode
generated would be a murmur hash after encoding data into a byte[] using
BinarySortableSerDe & the data is then sorted using
key=(murmur_hash(byte[]) % n-reducers).


The reducers then pull the data, merge-sort using the disk which is
entirely wasted CPU.

If you or anyone's interested in fixing this for Tez, I have a JIRA open
to the fix the hash-only shuffle -
https://issues.apache.org/jira/browse/HIVE-11858

Cheers,
Gopal