Author: gunther Date: Sat Jun 28 21:53:56 2014 New Revision: 1606401 URL: http://svn.apache.org/r1606401 Log: HIVE-7302: Allow Auto-reducer parallelism to be turned off by a logical optimizer (Gunther Hagleitner, reviewed by Gopal V and Vikram Dixit K)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1606401&r1=1606400&r2=1606401&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Sat Jun 28 21:53:56 2014 @@ -117,6 +117,10 @@ public class ReduceSinkMapJoinProc imple int numBuckets = -1; EdgeType edgeType = EdgeType.BROADCAST_EDGE; if (mapJoinOp.getConf().isBucketMapJoin()) { + + // disable auto parallelism for bucket map joins + parentRS.getConf().setAutoParallel(false); + numBuckets = (Integer) mapJoinOp.getConf().getBigTableBucketNumMapping().values().toArray()[0]; if (mapJoinOp.getConf().getCustomBucketMapJoin()) { edgeType = EdgeType.CUSTOM_EDGE; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1606401&r1=1606400&r2=1606401&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Sat Jun 28 21:53:56 2014 @@ -87,7 +87,7 @@ public class ReduceSinkDesc extends Abst private float topNMemoryUsage = -1; private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded private boolean skipTag; // Skip writing tags when feeding into mapjoin hashtable - private boolean autoParallel = false; // Is reducer parallelism automatic or fixed + private Boolean autoParallel = null; // Is reducer auto-parallelism enabled, disabled or unset private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class); public ReduceSinkDesc() { @@ -140,7 +140,7 @@ public class ReduceSinkDesc extends Abst desc.setBucketCols(bucketCols); desc.setStatistics(this.getStatistics()); desc.setSkipTag(skipTag); - desc.setAutoParallel(autoParallel); + desc.autoParallel = autoParallel; return desc; } @@ -344,10 +344,16 @@ public class ReduceSinkDesc extends Abst } public final boolean isAutoParallel() { - return autoParallel; + return (autoParallel != null) && autoParallel; } public final void setAutoParallel(final boolean autoParallel) { - this.autoParallel = autoParallel; + // we don't allow turning on auto parallel once it has been + // explicitly turned off. That is to avoid scenarios where + // auto parallelism could break assumptions about number of + // reducers or hash function. + if (this.autoParallel == null || this.autoParallel == true) { + this.autoParallel = autoParallel; + } } }