Re: Hi, Hive People urgent question about [Distribute By] function
> When applying [Distribute By] on Hive to the framework, the function >should be partitionByHash on Flink. This is to spread out all the rows >distributed by a hash key from Object Class in Java. Hive does not use the Object hashCode - the identityHashCode is inconsistent, so Object.hashCode() . ObjectInspectorUtils::hashCode() is the hashcode used by the DBY in hive (SORT BY uses a Random number generator). Cheers, Gopal
Re: Hi, Hive People urgent question about [Distribute By] function
Thanks for your help. so do you think if we want the same result from Hive and Spark or the other freamwork, how could we try this one ? could you tell me in detail. Regards, Philip On Thu, Oct 22, 2015 at 6:25 PM, Gopal Vijayaraghavan wrote: > > > When applying [Distribute By] on Hive to the framework, the function > >should be partitionByHash on Flink. This is to spread out all the rows > >distributed by a hash key from Object Class in Java. > > Hive does not use the Object hashCode - the identityHashCode is > inconsistent, so Object.hashCode() . > > ObjectInspectorUtils::hashCode() is the hashcode used by the DBY in hive > (SORT BY uses a Random number generator). > > Cheers, > Gopal > > -- == *Hae Joon Lee* Now, in Germany, M.S. Candidate, Interested in Distributed System, Iterative Processing Dept. of Computer Science, Informatik in German, TUB Technical University of Berlin In Korea, M.S. Candidate, Computer Architecture Laboratory Dept. of Computer Science, KAIST Rm# 4414 CS Dept. KAIST 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701) Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea ==
Re: Hi, Hive People urgent question about [Distribute By] function
> so do you think if we want the same result from Hive and Spark or the >other freamwork, how could we try this one ? There's a special backwards compat slow codepath that gets triggered if you do set mapred.reduce.tasks=199; (or any number) This will produce the exact same hash-code as the java hashcode for Strings & Integers. The bucket-id is determined by (hashCode & Integer.MAX_VALUE) % numberOfBuckets but this also triggers a non-stable sort on an entirely empty key, which will shuffle the data so the output file's order bears no resemblance to the input file's order. Even with that setting, the only consistent layout produced by Hive is the CLUSTER BY, which will sort on the same key used for distribution & uses the java hashCode if the auto-parallelism is turned off by setting a fixed reducer count. Cheers, Gopal
Re: Hi, Hive People urgent question about [Distribute By] function
Hello, the same question about DISTRIBUTE BY on Hive. Accorring to you, you do not use hashCode of Object class on DBY, Distribute By. I tried to understand how ObjectInspectorUtils works for distribution, but it seemed it has a lot of Hive API. It is not much understnading. I want to override partitionByHash function on Flink like the same way of DBY on Hive. I am working on implementing some benchmark system for these two system, which could be contritbutino to Hive as well. Could you tell me in detail how it works? I am pretty sure if you do not user hashCode of Object class in Java, you defined the partition function for DBY. Regards, Philip Lee On Thu, Oct 22, 2015 at 7:13 PM, Gopal Vijayaraghavan wrote: > > > so do you think if we want the same result from Hive and Spark or the > >other freamwork, how could we try this one ? > > There's a special backwards compat slow codepath that gets triggered if > you do > > set mapred.reduce.tasks=199; (or any number) > > This will produce the exact same hash-code as the java hashcode for > Strings & Integers. > > The bucket-id is determined by > > (hashCode & Integer.MAX_VALUE) % numberOfBuckets > > but this also triggers a non-stable sort on an entirely empty key, which > will shuffle the data so the output file's order bears no resemblance to > the input file's order. > > > Even with that setting, the only consistent layout produced by Hive is the > CLUSTER BY, which will sort on the same key used for distribution & uses > the java hashCode if the auto-parallelism is turned off by setting a fixed > reducer count. > > Cheers, > Gopal > > > -- == *Hae Joon Lee* Now, in Germany, M.S. Candidate, Interested in Distributed System, Iterative Processing Dept. of Computer Science, Informatik in German, TUB Technical University of Berlin In Korea, M.S. Candidate, Computer Architecture Laboratory Dept. of Computer Science, KAIST Rm# 4414 CS Dept. KAIST 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701) Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea ==
Re: Hi, Hive People urgent question about [Distribute By] function
Hello, the same question about DISTRIBUTE BY on Hive. Accorring to you, you do not use hashCode of Object class on DBY, Distribute By. I tried to understand how ObjectInspectorUtils works for distribution, but it seemed it has a lot of Hive API. It is not much understnading. I want to override partitionByHash function on Flink like the same way of DBY on Hive. I am working on implementing some benchmark system for these two system, which could be contritbutino to Hive as well. Could you tell me in detail how it works? I am pretty sure if you do not user hashCode of Object class in Java, you defined the partition function for DBY. On Sun, Oct 25, 2015 at 12:59 AM, Philip Lee wrote: > Hello, the same question about DISTRIBUTE BY on Hive. > > Accorring to you, you do not use hashCode of Object class on DBY, > Distribute By. > > I tried to understand how ObjectInspectorUtils works for distribution, > but it seemed it has a lot of Hive API. It is not much understnading. > I want to override partitionByHash function on Flink like the same way of > DBY on Hive. > I am working on implementing some benchmark system for these two system, > which could be contritbutino to Hive as well. > > Could you tell me in detail how it works? > I am pretty sure if you do not user hashCode of Object class in Java, you > defined the partition function for DBY. > > Regards, > Philip Lee > > > On Thu, Oct 22, 2015 at 7:13 PM, Gopal Vijayaraghavan > wrote: > >> >> > so do you think if we want the same result from Hive and Spark or the >> >other freamwork, how could we try this one ? >> >> There's a special backwards compat slow codepath that gets triggered if >> you do >> >> set mapred.reduce.tasks=199; (or any number) >> >> This will produce the exact same hash-code as the java hashcode for >> Strings & Integers. >> >> The bucket-id is determined by >> >> (hashCode & Integer.MAX_VALUE) % numberOfBuckets >> >> but this also triggers a non-stable sort on an entirely empty key, which >> will shuffle the data so the output file's order bears no resemblance to >> the input file's order. >> >> >> Even with that setting, the only consistent layout produced by Hive is the >> CLUSTER BY, which will sort on the same key used for distribution & uses >> the java hashCode if the auto-parallelism is turned off by setting a fixed >> reducer count. >> >> Cheers, >> Gopal >> >> >> > > > -- > > == > > *Hae Joon Lee* > > > Now, in Germany, > > M.S. Candidate, Interested in Distributed System, Iterative Processing > > Dept. of Computer Science, Informatik in German, TUB > > Technical University of Berlin > > > In Korea, > > M.S. Candidate, Computer Architecture Laboratory > > Dept. of Computer Science, KAIST > > > Rm# 4414 CS Dept. KAIST > > 373-1 Guseong-dong, Yuseong-gu, Daejon, South Korea (305-701) > > > Mobile) 49) 015-251-448-278 in Germany, no cellular in Korea > > == >
Re: Hi, Hive People urgent question about [Distribute By] function
> I want to override partitionByHash function on Flink like the same way >of DBY on Hive. > I am working on implementing some benchmark system for these two system, >which could be contritbutino to Hive as well. I would be very disappointed if Flink fails to outperform Hive with a Distribute BY, because the hive version is about 5-10x slower than it can be with Tez. Mapreduce forces a full sort of the output data, so the Hive version will be potentially O(N*LOG(N)) by default while Flink should be able to do O(N). Assuming you don't turn on any of the compatibility modes, the hashCode generated would be a murmur hash after encoding data into a byte[] using BinarySortableSerDe & the data is then sorted using key=(murmur_hash(byte[]) % n-reducers). The reducers then pull the data, merge-sort using the disk which is entirely wasted CPU. If you or anyone's interested in fixing this for Tez, I have a JIRA open to the fix the hash-only shuffle - https://issues.apache.org/jira/browse/HIVE-11858 Cheers, Gopal