Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Thu Oct 30 16:22:33 2014 @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimi import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; +import java.util.TreeMap; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -67,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.Ex import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -121,7 +122,11 @@ import org.apache.hadoop.hive.serde2.Des import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.util.ReflectionUtils; public class Vectorizer implements PhysicalPlanResolver { @@ -282,13 +287,13 @@ public class Vectorizer implements Physi private PhysicalContext pctx; - private int keyColCount; - private int valueColCount; + private List<String> reduceColumnNames; + private List<TypeInfo> reduceTypeInfos; public VectorizationDispatcher(PhysicalContext pctx) { this.pctx = pctx; - keyColCount = 0; - valueColCount = 0; + reduceColumnNames = null; + reduceTypeInfos = null; } @Override @@ -385,14 +390,13 @@ public class Vectorizer implements Physi HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>(); ogw.startWalking(topNodes, nodeOutput); - Map<String, Map<Integer, String>> columnVectorTypes = vnp.getScratchColumnVectorTypes(); - mapWork.setScratchColumnVectorTypes(columnVectorTypes); - Map<String, Map<String, Integer>> columnMap = vnp.getScratchColumnMap(); - mapWork.setScratchColumnMap(columnMap); + Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps = vnp.getAllScratchColumnVectorTypeMaps(); + mapWork.setAllScratchColumnVectorTypeMaps(allScratchColumnVectorTypeMaps); + Map<String, Map<String, Integer>> allColumnVectorMaps = vnp.getAllColumnVectorMaps(); + mapWork.setAllColumnVectorMaps(allColumnVectorMaps); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString())); - LOG.debug(String.format("columnMap: %s", columnMap.toString())); + debugDisplayAllMaps(allColumnVectorMaps, allScratchColumnVectorTypeMaps); } return; @@ -413,7 +417,7 @@ public class Vectorizer implements Physi return false; } StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector; - keyColCount = keyStructObjectInspector.getAllStructFieldRefs().size(); + List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs(); // Tez doesn't use tagging... if (reduceWork.getNeedsTagging()) { @@ -426,9 +430,20 @@ public class Vectorizer implements Physi !(valueObjectInspector instanceof StructObjectInspector)) { return false; } - StructObjectInspector valueStructObjectInspector = - (StructObjectInspector)valueObjectInspector; - valueColCount = valueStructObjectInspector.getAllStructFieldRefs().size(); + StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; + List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs(); + + reduceColumnNames = new ArrayList<String>(); + reduceTypeInfos = new ArrayList<TypeInfo>(); + + for (StructField field: keyFields) { + reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); + reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); + } + for (StructField field: valueFields) { + reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); + } } catch (Exception e) { throw new SemanticException(e); } @@ -478,7 +493,7 @@ public class Vectorizer implements Physi // VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker. Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); ReduceWorkVectorizationNodeProcessor vnp = - new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount); + new ReduceWorkVectorizationNodeProcessor(reduceColumnNames); addReduceWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderWalker(disp); @@ -495,18 +510,17 @@ public class Vectorizer implements Physi Operator<? extends OperatorDesc> reducer = reduceWork.getReducer(); if (reducer.getType().equals(OperatorType.EXTRACT)) { - ((VectorExtractOperator)reducer).setKeyAndValueColCounts(keyColCount, valueColCount); + ((VectorExtractOperator)reducer).setReduceTypeInfos(reduceTypeInfos); } - Map<String, Map<Integer, String>> columnVectorTypes = vnp.getScratchColumnVectorTypes(); - reduceWork.setScratchColumnVectorTypes(columnVectorTypes); - Map<String, Map<String, Integer>> columnMap = vnp.getScratchColumnMap(); - reduceWork.setScratchColumnMap(columnMap); + Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps = vnp.getAllScratchColumnVectorTypeMaps(); + reduceWork.setAllScratchColumnVectorTypeMaps(allScratchColumnVectorTypeMaps); + Map<String, Map<String, Integer>> allColumnVectorMaps = vnp.getAllColumnVectorMaps(); + reduceWork.setAllColumnVectorMaps(allColumnVectorMaps); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString())); - LOG.debug(String.format("columnMap: %s", columnMap.toString())); + debugDisplayAllMaps(allColumnVectorMaps, allScratchColumnVectorTypeMaps); } } } @@ -571,26 +585,26 @@ public class Vectorizer implements Physi protected final Set<Operator<? extends OperatorDesc>> opsDone = new HashSet<Operator<? extends OperatorDesc>>(); - public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() { - Map<String, Map<Integer, String>> scratchColumnVectorTypes = + public Map<String, Map<Integer, String>> getAllScratchColumnVectorTypeMaps() { + Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps = new HashMap<String, Map<Integer, String>>(); for (String onefile : scratchColumnContext.keySet()) { VectorizationContext vc = scratchColumnContext.get(onefile); - Map<Integer, String> cmap = vc.getOutputColumnTypeMap(); - scratchColumnVectorTypes.put(onefile, cmap); + Map<Integer, String> cmap = vc.getScratchColumnTypeMap(); + allScratchColumnVectorTypeMaps.put(onefile, cmap); } - return scratchColumnVectorTypes; + return allScratchColumnVectorTypeMaps; } - public Map<String, Map<String, Integer>> getScratchColumnMap() { - Map<String, Map<String, Integer>> scratchColumnMap = + public Map<String, Map<String, Integer>> getAllColumnVectorMaps() { + Map<String, Map<String, Integer>> allColumnVectorMaps = new HashMap<String, Map<String, Integer>>(); for(String oneFile: scratchColumnContext.keySet()) { VectorizationContext vc = scratchColumnContext.get(oneFile); - Map<String, Integer> cmap = vc.getColumnMap(); - scratchColumnMap.put(oneFile, cmap); + Map<String, Integer> cmap = vc.getProjectionColumnMap(); + allColumnVectorMaps.put(oneFile, cmap); } - return scratchColumnMap; + return allColumnVectorMaps; } public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack, @@ -676,10 +690,7 @@ public class Vectorizer implements Physi vContext.setFileKey(onefile); scratchColumnContext.put(onefile, vContext); if (LOG.isDebugEnabled()) { - LOG.debug("Vectorized MapWork operator " + op.getName() + - " with vectorization context key=" + vContext.getFileKey() + - ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() + - ", columnMap: " + vContext.getColumnMap().toString()); + LOG.debug("Vectorized MapWork operator " + op.getName() + " vectorization context " + vContext.toString()); } break; } @@ -710,17 +721,11 @@ public class Vectorizer implements Physi Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext); if (LOG.isDebugEnabled()) { - LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + - " with vectorization context key=" + vContext.getFileKey() + - ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() + - ", columnMap: " + vContext.getColumnMap().toString()); + LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " vectorization context " + vContext.toString()); if (vectorOp instanceof VectorizationContextRegion) { VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp; VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext(); - LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + - " added new vectorization context key=" + vOutContext.getFileKey() + - ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() + - ", columnMap: " + vOutContext.getColumnMap().toString()); + LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " added vectorization context " + vContext.toString()); } } @@ -730,10 +735,7 @@ public class Vectorizer implements Physi class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor { - private final ReduceWork rWork; - private int keyColCount; - private int valueColCount; - private Map<String, Integer> reduceColumnNameMap; + private List<String> reduceColumnNames; private VectorizationContext reduceShuffleVectorizationContext; @@ -743,12 +745,8 @@ public class Vectorizer implements Physi return rootVectorOp; } - public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount, - int valueColCount) { - this.rWork = rWork; - reduceColumnNameMap = rWork.getReduceColumnNameMap(); - this.keyColCount = keyColCount; - this.valueColCount = valueColCount; + public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames) { + this.reduceColumnNames = reduceColumnNames; rootVectorOp = null; reduceShuffleVectorizationContext = null; } @@ -766,17 +764,16 @@ public class Vectorizer implements Physi boolean saveRootVectorOp = false; if (op.getParentOperators().size() == 0) { - vContext = getReduceVectorizationContext(reduceColumnNameMap); + LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString()); + + vContext = new VectorizationContext(reduceColumnNames); vContext.setFileKey("_REDUCE_SHUFFLE_"); scratchColumnContext.put("_REDUCE_SHUFFLE_", vContext); reduceShuffleVectorizationContext = vContext; saveRootVectorOp = true; if (LOG.isDebugEnabled()) { - LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context key=" + - vContext.getFileKey() + - ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() + - ", columnMap: " + vContext.getColumnMap().toString()); + LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context " + vContext.toString()); } } else { vContext = walkStackToFindVectorizationContext(stack, op); @@ -802,17 +799,11 @@ public class Vectorizer implements Physi Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext); if (LOG.isDebugEnabled()) { - LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + - " with vectorization context key=" + vContext.getFileKey() + - ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() + - ", columnMap: " + vContext.getColumnMap().toString()); + LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " vectorization context " + vContext.toString()); if (vectorOp instanceof VectorizationContextRegion) { VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp; VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext(); - LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + - " added new vectorization context key=" + vOutContext.getFileKey() + - ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() + - ", columnMap: " + vOutContext.getColumnMap().toString()); + LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " added vectorization context " + vContext.toString()); } } if (vectorOp instanceof VectorGroupByOperator) { @@ -830,7 +821,7 @@ public class Vectorizer implements Physi private static class ValidatorVectorizationContext extends VectorizationContext { private ValidatorVectorizationContext() { - super(null, -1); + super(); } @Override @@ -918,7 +909,12 @@ public class Vectorizer implements Physi } break; case GROUPBY: - ret = validateGroupByOperator((GroupByOperator) op, true, true); + if (HiveConf.getBoolVar(physicalContext.getConf(), + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) { + ret = validateGroupByOperator((GroupByOperator) op, true, true); + } else { + ret = false; + } break; case FILTER: ret = validateFilterOperator((FilterOperator) op); @@ -1026,12 +1022,10 @@ public class Vectorizer implements Physi if (!ret) { return false; } - boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce); - vectorDesc.setVectorOutput(isVectorOutput); if (isReduce) { if (desc.isDistinct()) { LOG.info("Distinct not supported in reduce vector mode"); - return false; + return false; } // Sort-based GroupBy? if (desc.getMode() != GroupByDesc.Mode.COMPLETE && @@ -1044,21 +1038,24 @@ public class Vectorizer implements Physi LOG.info("Reduce GROUP BY mode is " + desc.getMode().name()); if (desc.getGroupKeyNotReductionKey()) { LOG.info("Reduce vector mode not supported when group key is not reduction key"); - return false; + return false; } - if (!isVectorOutput) { + if (!aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce)) { LOG.info("Reduce vector mode only supported when aggregate outputs are primitive types"); - return false; + return false; } if (desc.getKeys().size() > 0) { + if (op.getParentOperators().size() > 0) { + LOG.info("Reduce vector mode can only handle a key group GROUP BY operator when it is fed by reduce-shuffle"); + return false; + } LOG.info("Reduce-side GROUP BY will process key groups"); vectorDesc.setVectorGroupBatches(true); } else { LOG.info("Reduce-side GROUP BY will do global aggregation"); } + vectorDesc.setVectorOutput(true); vectorDesc.setIsReduce(true); - } else { - LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput); } return true; } @@ -1227,21 +1224,17 @@ public class Vectorizer implements Physi PhysicalContext pctx) { RowSchema rs = op.getSchema(); - Map<String, Integer> cmap = new HashMap<String, Integer>(); - int columnCount = 0; + // Add all non-virtual columns to make a vectorization context for + // the TableScan operator. + VectorizationContext vContext = new VectorizationContext(); for (ColumnInfo c : rs.getSignature()) { // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560). if (!isVirtualColumn(c)) { - cmap.put(c.getInternalName(), columnCount++); + vContext.addInitialColumn(c.getInternalName()); } } - - return new VectorizationContext(cmap, columnCount); - } - - private VectorizationContext getReduceVectorizationContext( - Map<String, Integer> reduceColumnNameMap) { - return new VectorizationContext(reduceColumnNameMap, reduceColumnNameMap.size()); + vContext.finishedAddingInitialColumns(); + return vContext; } private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, @@ -1297,4 +1290,41 @@ public class Vectorizer implements Physi } return false; } + + public void debugDisplayAllMaps(Map<String, Map<String, Integer>> allColumnVectorMaps, + Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps) { + + // Context keys grow in length since they are a path... + Comparator<String> comparerShorterString = new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + Integer length1 = o1.length(); + Integer length2 = o2.length(); + return length1.compareTo(length2); + }}; + + Comparator<Integer> comparerInteger = new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + }}; + + Map<String, Map<Integer, String>> sortedAllColumnVectorMaps = new TreeMap<String, Map<Integer, String>>(comparerShorterString); + for (Map.Entry<String, Map<String, Integer>> entry : allColumnVectorMaps.entrySet()) { + Map<Integer, String> sortedColumnMap = new TreeMap<Integer, String>(comparerInteger); + for (Map.Entry<String, Integer> innerEntry : entry.getValue().entrySet()) { + sortedColumnMap.put(innerEntry.getValue(), innerEntry.getKey()); + } + sortedAllColumnVectorMaps.put(entry.getKey(), sortedColumnMap); + } + LOG.debug("sortedAllColumnVectorMaps " + sortedAllColumnVectorMaps); + + Map<String, Map<Integer, String>> sortedAllScratchColumnVectorTypeMap = new TreeMap<String, Map<Integer, String>>(comparerShorterString); + for (Map.Entry<String, Map<Integer, String>> entry : allScratchColumnVectorTypeMaps.entrySet()) { + Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger); + sortedScratchColumnTypeMap.putAll(entry.getValue()); + sortedAllScratchColumnVectorTypeMap.put(entry.getKey(), sortedScratchColumnTypeMap); + } + LOG.debug("sortedAllScratchColumnVectorTypeMap " + sortedAllScratchColumnVectorTypeMap); + } }
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Thu Oct 30 16:22:33 2014 @@ -56,7 +56,9 @@ import org.apache.hadoop.hive.ql.plan.Ex import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; /** @@ -188,12 +190,18 @@ public class PartitionPruner implements // Replace virtual columns with nulls. See javadoc for details. prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), partColsUsedInFilter); // Remove all parts that are not partition columns. See javadoc for details. - ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone()); + ExprNodeDesc compactExpr = compactExpr(prunerExpr.clone()); String oldFilter = prunerExpr.getExprString(); - if (compactExpr == null) { - // Non-strict mode, and all the predicates are on non-partition columns - get everything. - LOG.debug("Filter " + oldFilter + " was null after compacting"); - return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap); + if (isBooleanExpr(compactExpr)) { + // For null and true values, return every partition + if (!isFalseExpr(compactExpr)) { + // Non-strict mode, and all the predicates are on non-partition columns - get everything. + LOG.debug("Filter " + oldFilter + " was null after compacting"); + return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap); + } else { + return new PrunedPartitionList(tab, new LinkedHashSet<Partition>(new ArrayList<Partition>()), + new ArrayList<String>(), false); + } } LOG.debug("Filter w/ compacting: " + compactExpr.getExprString() + "; filter w/o compacting: " + oldFilter); @@ -204,7 +212,7 @@ public class PartitionPruner implements return ppList; } - ppList = getPartitionsFromServer(tab, compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString())); + ppList = getPartitionsFromServer(tab, (ExprNodeGenericFuncDesc)compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString())); prunedPartitionsMap.put(key, ppList); return ppList; } @@ -225,16 +233,22 @@ public class PartitionPruner implements partsCache.put(key, ppList); return ppList; } - - private static ExprNodeDesc removeTruePredciates(ExprNodeDesc e) { - if (e instanceof ExprNodeConstantDesc) { - ExprNodeConstantDesc eC = (ExprNodeConstantDesc) e; - if (e.getTypeInfo() == TypeInfoFactory.booleanTypeInfo - && eC.getValue() == Boolean.TRUE) { - return null; - } - } - return e; + + static private boolean isBooleanExpr(ExprNodeDesc expr) { + return expr != null && expr instanceof ExprNodeConstantDesc && + ((ExprNodeConstantDesc)expr).getTypeInfo() instanceof PrimitiveTypeInfo && + ((PrimitiveTypeInfo)(((ExprNodeConstantDesc)expr).getTypeInfo())). + getTypeName().equals(serdeConstants.BOOLEAN_TYPE_NAME); + } + static private boolean isTrueExpr(ExprNodeDesc expr) { + return isBooleanExpr(expr) && + ((ExprNodeConstantDesc)expr).getValue() != null && + ((ExprNodeConstantDesc)expr).getValue().equals(Boolean.TRUE); + } + static private boolean isFalseExpr(ExprNodeDesc expr) { + return isBooleanExpr(expr) && + ((ExprNodeConstantDesc)expr).getValue() != null && + ((ExprNodeConstantDesc)expr).getValue().equals(Boolean.FALSE); } /** @@ -245,10 +259,13 @@ public class PartitionPruner implements * @return partition pruning expression that only contains partition columns. */ static private ExprNodeDesc compactExpr(ExprNodeDesc expr) { - if (expr instanceof ExprNodeConstantDesc) { - expr = removeTruePredciates(expr); - if (expr == null || ((ExprNodeConstantDesc)expr).getValue() == null) { - return null; + // If this is a constant boolean expression, return the value. + if (expr == null) { + return null; + } + if (expr instanceof ExprNodeConstantDesc) { + if (isBooleanExpr(expr)) { + return expr; } else { throw new IllegalStateException("Unexpected non-null ExprNodeConstantDesc: " + expr.getExprString()); @@ -256,22 +273,29 @@ public class PartitionPruner implements } else if (expr instanceof ExprNodeGenericFuncDesc) { GenericUDF udf = ((ExprNodeGenericFuncDesc)expr).getGenericUDF(); boolean isAnd = udf instanceof GenericUDFOPAnd; - if (isAnd || udf instanceof GenericUDFOPOr) { + boolean isOr = udf instanceof GenericUDFOPOr; + + if (isAnd || isOr) { List<ExprNodeDesc> children = expr.getChildren(); - ExprNodeDesc left = removeTruePredciates(children.get(0)); - children.set(0, left == null ? null : compactExpr(left)); - ExprNodeDesc right = removeTruePredciates(children.get(1)); - children.set(1, right == null ? null : compactExpr(right)); - - // Note that one does not simply compact (not-null or null) to not-null. - // Only if we have an "and" is it valid to send one side to metastore. - if (children.get(0) == null && children.get(1) == null) { - return null; - } else if (children.get(0) == null) { - return isAnd ? children.get(1) : null; - } else if (children.get(1) == null) { - return isAnd ? children.get(0) : null; - } + ExprNodeDesc left = children.get(0); + children.set(0, compactExpr(left)); + ExprNodeDesc right = children.get(1); + children.set(1, compactExpr(right)); + + if (isTrueExpr(children.get(0)) && isTrueExpr(children.get(1))) { + return new ExprNodeConstantDesc(Boolean.TRUE); + } else if (isTrueExpr(children.get(0))) { + return isAnd ? children.get(1) : new ExprNodeConstantDesc(Boolean.TRUE); + } else if (isTrueExpr(children.get(1))) { + return isAnd ? children.get(0) : new ExprNodeConstantDesc(Boolean.TRUE); + } else if (isFalseExpr(children.get(0)) && isFalseExpr(children.get(1))) { + return new ExprNodeConstantDesc(Boolean.FALSE); + } else if (isFalseExpr(children.get(0))) { + return isAnd ? new ExprNodeConstantDesc(Boolean.FALSE) : children.get(1); + } else if (isFalseExpr(children.get(1))) { + return isAnd ? new ExprNodeConstantDesc(Boolean.FALSE) : children.get(0); + } + } return expr; } else { @@ -296,9 +320,9 @@ public class PartitionPruner implements if (!partCols.contains(column)) { // Column doesn't appear to be a partition column for the table. return new ExprNodeConstantDesc(expr.getTypeInfo(), null); - } + } referred.add(column); - } + } if (expr instanceof ExprNodeGenericFuncDesc) { List<ExprNodeDesc> children = expr.getChildren(); for (int i = 0; i < children.size(); ++i) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Thu Oct 30 16:22:33 2014 @@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.exec.Re import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -151,28 +150,28 @@ public class StatsRulesProcFactory { Statistics parentStats = parent.getStatistics(); AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx; HiveConf conf = aspCtx.getConf(); + Statistics stats = null; - // SELECT (*) does not change the statistics. Just pass on the parent statistics - if (sop.getConf().isSelectStar()) { + if (parentStats != null) { try { - if (parentStats != null) { - sop.setStatistics(parentStats.clone()); - } + stats = parentStats.clone(); } catch (CloneNotSupportedException e) { throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg()); } - return null; } try { if (satisfyPrecondition(parentStats)) { - Statistics stats = parentStats.clone(); - List<ColStatistics> colStats = - StatsUtils.getColStatisticsFromExprMap(conf, parentStats, sop.getColumnExprMap(), - sop.getSchema()); - long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats); + // this will take care of mapping between input column names and output column names. The + // returned column stats will have the output column names. + List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats, + sop.getColumnExprMap(), sop.getSchema()); stats.setColumnStats(colStats); - stats.setDataSize(setMaxIfInvalid(dataSize)); + // in case of select(*) the data size does not change + if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) { + long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats); + stats.setDataSize(setMaxIfInvalid(dataSize)); + } sop.setStatistics(stats); if (isDebugEnabled) { @@ -889,16 +888,12 @@ public class StatsRulesProcFactory { GroupByDesc.Mode mode = desc.getMode(); if (mode.equals(GroupByDesc.Mode.HASH)) { - float hashAggMem = conf.getFloatVar( - HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); - float hashAggMaxThreshold = conf.getFloatVar( - HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); - - // get memory for container. May be use mapreduce.map.java.opts instead? - long totalMemory = - DagUtils.getContainerResource(conf).getMemory() * 1000L * 1000L; - long maxMemHashAgg = Math - .round(totalMemory * hashAggMem * hashAggMaxThreshold); + float hashAggMem = conf.getFloatVar(HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); + float hashAggMaxThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + // get available map memory + long totalMemory = StatsUtils.getAvailableMemory(conf) * 1000L * 1000L; + long maxMemHashAgg = Math.round(totalMemory * hashAggMem * hashAggMaxThreshold); // estimated number of rows will be product of NDVs long numEstimatedRows = 1; @@ -1026,11 +1021,17 @@ public class StatsRulesProcFactory { */ public static class JoinStatsRule extends DefaultStatsRule implements NodeProcessor { + private boolean pkfkInferred = false; + private long newNumRows = 0; + private List<Operator<? extends OperatorDesc>> parents; + private CommonJoinOperator<? extends JoinDesc> jop; + private int numAttr = 1; + @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - CommonJoinOperator<? extends JoinDesc> jop = (CommonJoinOperator<? extends JoinDesc>) nd; - List<Operator<? extends OperatorDesc>> parents = jop.getParentOperators(); + jop = (CommonJoinOperator<? extends JoinDesc>) nd; + parents = jop.getParentOperators(); AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx; HiveConf conf = aspCtx.getConf(); boolean allStatsAvail = true; @@ -1057,22 +1058,26 @@ public class StatsRulesProcFactory { Statistics stats = new Statistics(); Map<String, Long> rowCountParents = new HashMap<String, Long>(); List<Long> distinctVals = Lists.newArrayList(); - - // 2 relations, multiple attributes - boolean multiAttr = false; - int numAttr = 1; int numParent = parents.size(); - Map<String, ColStatistics> joinedColStats = Maps.newHashMap(); Map<Integer, List<String>> joinKeys = Maps.newHashMap(); List<Long> rowCounts = Lists.newArrayList(); + // detect if there are multiple attributes in join key + ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0); + List<ExprNodeDesc> keyExprs = rsOp.getConf().getKeyCols(); + numAttr = keyExprs.size(); + + // infer PK-FK relationship in single attribute join case + pkfkInferred = false; + inferPKFKRelationship(); + // get the join keys from parent ReduceSink operators for (int pos = 0; pos < parents.size(); pos++) { ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos); Statistics parentStats = parent.getStatistics(); - List<ExprNodeDesc> keyExprs = parent.getConf().getKeyCols(); + keyExprs = parent.getConf().getKeyCols(); // Parent RS may have column statistics from multiple parents. // Populate table alias to row count map, this will be used later to @@ -1087,12 +1092,6 @@ public class StatsRulesProcFactory { } rowCounts.add(parentStats.getNumRows()); - // multi-attribute join key - if (keyExprs.size() > 1) { - multiAttr = true; - numAttr = keyExprs.size(); - } - // compute fully qualified join key column names. this name will be // used to quickly look-up for column statistics of join key. // TODO: expressions in join condition will be ignored. assign @@ -1115,7 +1114,7 @@ public class StatsRulesProcFactory { // attribute join, else max(V(R,y1), V(S,y1)) * max(V(R,y2), V(S,y2)) // in case of multi-attribute join long denom = 1; - if (multiAttr) { + if (numAttr > 1) { List<Long> perAttrDVs = Lists.newArrayList(); for (int idx = 0; idx < numAttr; idx++) { for (Integer i : joinKeys.keySet()) { @@ -1154,9 +1153,7 @@ public class StatsRulesProcFactory { } // Update NDV of joined columns to be min(V(R,y), V(S,y)) - if (multiAttr) { - updateJoinColumnsNDV(joinKeys, joinedColStats, numAttr); - } + updateJoinColumnsNDV(joinKeys, joinedColStats, numAttr); // column statistics from different sources are put together and rename // fully qualified column names based on output schema of join operator @@ -1186,10 +1183,8 @@ public class StatsRulesProcFactory { // update join statistics stats.setColumnStats(outColStats); - long newRowCount = computeNewRowCount(rowCounts, denom); - - updateStatsForJoinType(stats, newRowCount, jop, rowCountParents, - outInTabAlias); + long newRowCount = pkfkInferred ? newNumRows : computeNewRowCount(rowCounts, denom); + updateStatsForJoinType(stats, newRowCount, jop, rowCountParents,outInTabAlias); jop.setStatistics(stats); if (isDebugEnabled) { @@ -1234,6 +1229,195 @@ public class StatsRulesProcFactory { return null; } + private void inferPKFKRelationship() { + if (numAttr == 1) { + List<Integer> parentsWithPK = getPrimaryKeyCandidates(parents); + + // in case of fact to many dimensional tables join, the join key in fact table will be + // mostly foreign key which will have corresponding primary key in dimension table. + // The selectivity of fact table in that case will be product of all selectivities of + // dimension tables (assumes conjunctivity) + for (Integer id : parentsWithPK) { + ColStatistics csPK = null; + Operator<? extends OperatorDesc> parent = parents.get(id); + for (ColStatistics cs : parent.getStatistics().getColumnStats()) { + if (cs.isPrimaryKey()) { + csPK = cs; + break; + } + } + + // infer foreign key candidates positions + List<Integer> parentsWithFK = getForeignKeyCandidates(parents, csPK); + if (parentsWithFK.size() == 1 && + parentsWithFK.size() + parentsWithPK.size() == parents.size()) { + Operator<? extends OperatorDesc> parentWithFK = parents.get(parentsWithFK.get(0)); + List<Float> parentsSel = getSelectivity(parents, parentsWithPK); + Float prodSelectivity = 1.0f; + for (Float selectivity : parentsSel) { + prodSelectivity *= selectivity; + } + newNumRows = (long) Math.ceil( + parentWithFK.getStatistics().getNumRows() * prodSelectivity); + pkfkInferred = true; + + // some debug information + if (isDebugEnabled) { + List<String> parentIds = Lists.newArrayList(); + + // print primary key containing parents + for (Integer i : parentsWithPK) { + parentIds.add(parents.get(i).toString()); + } + LOG.debug("STATS-" + jop.toString() + ": PK parent id(s) - " + parentIds); + parentIds.clear(); + + // print foreign key containing parents + for (Integer i : parentsWithFK) { + parentIds.add(parents.get(i).toString()); + } + LOG.debug("STATS-" + jop.toString() + ": FK parent id(s) - " + parentIds); + } + } + } + } + } + + /** + * Get selectivity of reduce sink operators. + * @param ops - reduce sink operators + * @param opsWithPK - reduce sink operators with primary keys + * @return - list of selectivity for primary key containing operators + */ + private List<Float> getSelectivity(List<Operator<? extends OperatorDesc>> ops, + List<Integer> opsWithPK) { + List<Float> result = Lists.newArrayList(); + for (Integer idx : opsWithPK) { + Operator<? extends OperatorDesc> op = ops.get(idx); + float selectivity = getSelectivitySimpleTree(op); + result.add(selectivity); + } + return result; + } + + private float getSelectivitySimpleTree(Operator<? extends OperatorDesc> op) { + TableScanOperator tsOp = OperatorUtils + .findSingleOperatorUpstream(op, TableScanOperator.class); + if (tsOp == null) { + // complex tree with multiple parents + return getSelectivityComplexTree(op); + } else { + // simple tree with single parent + long inputRow = tsOp.getStatistics().getNumRows(); + long outputRow = op.getStatistics().getNumRows(); + return (float) outputRow / (float) inputRow; + } + } + + private float getSelectivityComplexTree(Operator<? extends OperatorDesc> op) { + Operator<? extends OperatorDesc> multiParentOp = null; + Operator<? extends OperatorDesc> currentOp = op; + + // TS-1 TS-2 + // | | + // RS-1 RS-2 + // \ / + // JOIN + // | + // FIL + // | + // RS-3 + // + // For the above complex operator tree, + // selectivity(JOIN) = selectivity(RS-1) * selectivity(RS-2) and + // selectivity(RS-3) = numRows(RS-3)/numRows(JOIN) * selectivity(JOIN) + while(multiParentOp == null) { + if (op.getParentOperators().size() > 1) { + multiParentOp = op; + } else { + op = op.getParentOperators().get(0); + } + } + + float selMultiParent = 1.0f; + for(Operator<? extends OperatorDesc> parent : multiParentOp.getParentOperators()) { + // In the above example, TS-1 -> RS-1 and TS-2 -> RS-2 are simple trees + selMultiParent *= getSelectivitySimpleTree(parent); + } + + float selCurrOp = ((float) currentOp.getStatistics().getNumRows() / + (float) multiParentOp.getStatistics().getNumRows()) * selMultiParent; + + return selCurrOp; + } + + /** + * Returns the index of parents whose join key column statistics ranges are within the specified + * primary key range (inferred as foreign keys). + * @param ops - operators + * @param csPK - column statistics of primary key + * @return - list of foreign key containing parent ids + */ + private List<Integer> getForeignKeyCandidates(List<Operator<? extends OperatorDesc>> ops, + ColStatistics csPK) { + List<Integer> result = Lists.newArrayList(); + if (csPK == null || ops == null) { + return result; + } + + for (int i = 0; i < ops.size(); i++) { + Operator<? extends OperatorDesc> op = ops.get(i); + if (op != null && op instanceof ReduceSinkOperator) { + ReduceSinkOperator rsOp = (ReduceSinkOperator) op; + List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols(); + List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys, + rsOp.getColumnExprMap()); + if (fqCols.size() == 1) { + String joinCol = fqCols.get(0); + if (rsOp.getStatistics() != null) { + ColStatistics cs = rsOp.getStatistics().getColumnStatisticsFromFQColName(joinCol); + if (cs != null && !cs.isPrimaryKey()) { + if (StatsUtils.inferForeignKey(csPK, cs)) { + result.add(i); + } + } + } + } + } + } + return result; + } + + /** + * Returns the index of parents whose join key columns are infer as primary keys + * @param ops - operators + * @return - list of primary key containing parent ids + */ + private List<Integer> getPrimaryKeyCandidates(List<Operator<? extends OperatorDesc>> ops) { + List<Integer> result = Lists.newArrayList(); + if (ops != null && !ops.isEmpty()) { + for (int i = 0; i < ops.size(); i++) { + Operator<? extends OperatorDesc> op = ops.get(i); + if (op instanceof ReduceSinkOperator) { + ReduceSinkOperator rsOp = (ReduceSinkOperator) op; + List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols(); + List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys, + rsOp.getColumnExprMap()); + if (fqCols.size() == 1) { + String joinCol = fqCols.get(0); + if (rsOp.getStatistics() != null) { + ColStatistics cs = rsOp.getStatistics().getColumnStatisticsFromFQColName(joinCol); + if (cs != null && cs.isPrimaryKey()) { + result.add(i); + } + } + } + } + } + } + return result; + } + private Long getEasedOutDenominator(List<Long> distinctVals) { // Exponential back-off for NDVs. // 1) Descending order sort of NDVs @@ -1253,7 +1437,7 @@ public class StatsRulesProcFactory { Map<String, Long> rowCountParents, Map<String, String> outInTabAlias) { - if (newNumRows <= 0) { + if (newNumRows < 0) { LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows." + newNumRows + " rows will be set to Long.MAX_VALUE"); } @@ -1528,6 +1712,8 @@ public class StatsRulesProcFactory { Object... nodeOutputs) throws SemanticException { Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd; OperatorDesc conf = op.getConf(); + AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx; + HiveConf hconf = aspCtx.getConf(); if (conf != null) { Statistics stats = conf.getStatistics(); @@ -1544,7 +1730,9 @@ public class StatsRulesProcFactory { stats.addToNumRows(parentStats.getNumRows()); stats.addToDataSize(parentStats.getDataSize()); stats.updateColumnStatsState(parentStats.getColumnStatsState()); - stats.addToColumnStats(parentStats.getColumnStats()); + List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(hconf, + parentStats, op.getColumnExprMap(), op.getSchema()); + stats.addToColumnStats(colStats); op.getConf().setStatistics(stats); if (isDebugEnabled) { @@ -1622,7 +1810,7 @@ public class StatsRulesProcFactory { boolean useColStats, Operator<? extends OperatorDesc> op, boolean updateNDV) { - if (newNumRows <= 0) { + if (newNumRows < 0) { LOG.info("STATS-" + op.toString() + ": Overflow in number of rows." + newNumRows + " rows will be set to Long.MAX_VALUE"); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Thu Oct 30 16:22:33 2014 @@ -79,7 +79,7 @@ import com.google.common.annotations.Vis * */ public abstract class BaseSemanticAnalyzer { - private static final Log STATIC_LOG = LogFactory.getLog(BaseSemanticAnalyzer.class.getName()); + protected static final Log STATIC_LOG = LogFactory.getLog(BaseSemanticAnalyzer.class.getName()); protected final Hive db; protected final HiveConf conf; protected List<Task<? extends Serializable>> rootTasks; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Thu Oct 30 16:22:33 2014 @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.ql.Driver; @@ -214,24 +215,6 @@ public class DDLSemanticAnalyzer extends return typeName; } - static class TablePartition { - String tableName; - HashMap<String, String> partSpec = null; - - public TablePartition() { - } - - public TablePartition(ASTNode tblPart) throws SemanticException { - tableName = getDotName((getQualifiedTableName((ASTNode) tblPart.getChild(0)))); - if (tblPart.getChildCount() > 1) { - ASTNode part = (ASTNode) tblPart.getChild(1); - if (part.getToken().getType() == HiveParser.TOK_PARTSPEC) { - this.partSpec = DDLSemanticAnalyzer.getPartSpec(part); - } - } - } - } - public DDLSemanticAnalyzer(HiveConf conf) throws SemanticException { this(conf, createHiveDB(conf)); } @@ -1034,7 +1017,7 @@ public class DDLSemanticAnalyzer extends rootTasks.add(truncateTask); } - private boolean isFullSpec(Table table, Map<String, String> partSpec) { + public static boolean isFullSpec(Table table, Map<String, String> partSpec) { for (FieldSchema partCol : table.getPartCols()) { if (partSpec.get(partCol.getName()) == null) { return false; @@ -1139,20 +1122,25 @@ public class DDLSemanticAnalyzer extends // configured not to ignore this boolean throwException = !ifExists && !HiveConf.getBoolVar(conf, ConfVars.DROPIGNORESNONEXISTENT); - if (throwException) { - try { - Index idx = db.getIndex(tableName, indexName); - } catch (HiveException e) { + Table tbl = getTable(tableName, false); + if (throwException && tbl == null) { + throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName)); + } + try { + Index idx = db.getIndex(tableName, indexName); + } catch (HiveException e) { + if (!(e.getCause() instanceof NoSuchObjectException)) { + throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg("dropping index"), e); + } + if (throwException) { throw new SemanticException(ErrorMsg.INVALID_INDEX.getMsg(indexName)); } } - - Table tbl = getTable(tableName, false); if (tbl != null) { - inputs.add(new ReadEntity(getTable(tableName))); + inputs.add(new ReadEntity(tbl)); } - DropIndexDesc dropIdxDesc = new DropIndexDesc(indexName, tableName); + DropIndexDesc dropIdxDesc = new DropIndexDesc(indexName, tableName, throwException); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), dropIdxDesc), conf)); } @@ -1399,11 +1387,22 @@ public class DDLSemanticAnalyzer extends // ReadEntity as no lock. re.noLockNeeded(); inputs.add(re); - if (desc == null || desc.getOp() != AlterTableDesc.AlterTableTypes.ALTERPROTECTMODE) { + + if (isFullSpec(tab, partSpec)) { + // Fully specified partition spec Partition part = getPartition(tab, partSpec, true); - outputs.add(new WriteEntity(part, writeType)); - } - else { + outputs.add(new WriteEntity(part, writeType)); + } else { + // Partial partition spec supplied. Make sure this is allowed. + if (desc == null + || !AlterTableDesc.doesAlterTableTypeSupportPartialPartitionSpec(desc.getOp())) { + String alterTabletype = (desc != null) ? desc.getOp().name() : ""; + throw new SemanticException( + ErrorMsg.ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED, alterTabletype); + } else if (!conf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { + throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_DISABLED); + } + for (Partition part : getPartitions(tab, partSpec, true)) { outputs.add(new WriteEntity(part, writeType)); } @@ -2240,6 +2239,10 @@ public class DDLSemanticAnalyzer extends if (ast.getChildCount() == 1) { String funcNames = stripQuotes(ast.getChild(0).getText()); showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile(), funcNames); + } else if (ast.getChildCount() == 2) { + assert (ast.getChild(0).getType() == HiveParser.KW_LIKE); + String funcNames = stripQuotes(ast.getChild(1).getText()); + showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile(), funcNames, true); } else { showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile()); } @@ -2805,7 +2808,7 @@ public class DDLSemanticAnalyzer extends * @param ast * The parsed command tree. * @throws SemanticException - * Parsin failed + * Parsing failed */ private void analyzeAlterTableTouch(String[] qualified, CommonTree ast) throws SemanticException { @@ -2929,8 +2932,8 @@ public class DDLSemanticAnalyzer extends * * @param ast Tree to extract partitions from. * @param tab Table. - * @param result Map of partitions by prefix length. Most of the time prefix length will - * be the same for all partition specs, so we can just OR the expressions. + * @return Map of partitions by prefix length. Most of the time prefix length will + * be the same for all partition specs, so we can just OR the expressions. */ private Map<Integer, List<ExprNodeGenericFuncDesc>> getFullPartitionSpecs( CommonTree ast, Table tab, boolean canGroupExprs) throws SemanticException { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Thu Oct 30 16:22:33 2014 @@ -57,6 +57,8 @@ import org.apache.hadoop.hive.ql.plan.Un import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL; + /** * GenTezUtils is a collection of shared helper methods to produce * TezWork @@ -117,7 +119,7 @@ public class GenTezUtils { reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); - if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) { + if (isAutoReduceParallelism && reduceSink.getConf().getReducerTraits().contains(AUTOPARALLEL)) { reduceWork.setAutoReduceParallelism(true); // configured limit for reducers Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Thu Oct 30 16:22:33 2014 @@ -123,6 +123,11 @@ public class GenTezWork implements NodeP context.rootToWorkMap.put(root, work); } + // this is where we set the sort columns that we will be using for KeyValueInputMerge + if (operator instanceof DummyStoreOperator) { + work.addSortCols(root.getOpTraits().getSortCols().get(0)); + } + if (!context.childToWorkMap.containsKey(operator)) { List<BaseWork> workItems = new LinkedList<BaseWork>(); workItems.add(work); @@ -137,17 +142,18 @@ public class GenTezWork implements NodeP // we are currently walking the big table side of the merge join. we need to create or hook up // merge join work. MergeJoinWork mergeJoinWork = null; - if (context.opMergeJoinWorkMap.containsKey(operator)) { + if (context.opMergeJoinWorkMap.containsKey(context.currentMergeJoinOperator)) { // we have found a merge work corresponding to this closing operator. Hook up this work. - mergeJoinWork = context.opMergeJoinWorkMap.get(operator); + mergeJoinWork = context.opMergeJoinWorkMap.get(context.currentMergeJoinOperator); } else { // we need to create the merge join work mergeJoinWork = new MergeJoinWork(); mergeJoinWork.setMergeJoinOperator(context.currentMergeJoinOperator); tezWork.add(mergeJoinWork); - context.opMergeJoinWorkMap.put(operator, mergeJoinWork); + context.opMergeJoinWorkMap.put(context.currentMergeJoinOperator, mergeJoinWork); } // connect the work correctly. + work.addSortCols(root.getOpTraits().getSortCols().get(0)); mergeJoinWork.addMergedWork(work, null); Operator<? extends OperatorDesc> parentOp = getParentFromStack(context.currentMergeJoinOperator, stack); @@ -334,10 +340,16 @@ public class GenTezWork implements NodeP UnionWork unionWork = (UnionWork) followingWork; int index = getMergeIndex(tezWork, unionWork, rs); // guaranteed to be instance of MergeJoinWork if index is valid - MergeJoinWork mergeJoinWork = (MergeJoinWork) tezWork.getChildren(unionWork).get(index); - // disconnect the connection to union work and connect to merge work - followingWork = mergeJoinWork; - rWork = (ReduceWork) mergeJoinWork.getMainWork(); + BaseWork baseWork = tezWork.getChildren(unionWork).get(index); + if (baseWork instanceof MergeJoinWork) { + MergeJoinWork mergeJoinWork = (MergeJoinWork) baseWork; + // disconnect the connection to union work and connect to merge work + followingWork = mergeJoinWork; + rWork = (ReduceWork) mergeJoinWork.getMainWork(); + } else { + throw new SemanticException("Unknown work type found: " + + baseWork.getClass().getCanonicalName()); + } } else { rWork = (ReduceWork) followingWork; } @@ -391,6 +403,8 @@ public class GenTezWork implements NodeP } else { index++; } + } else { + index++; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Thu Oct 30 16:22:33 2014 @@ -1334,7 +1334,7 @@ showStatement | KW_SHOW KW_TABLES ((KW_FROM|KW_IN) db_name=identifier)? (KW_LIKE showStmtIdentifier|showStmtIdentifier)? -> ^(TOK_SHOWTABLES (TOK_FROM $db_name)? showStmtIdentifier?) | KW_SHOW KW_COLUMNS (KW_FROM|KW_IN) tableName ((KW_FROM|KW_IN) db_name=identifier)? -> ^(TOK_SHOWCOLUMNS tableName $db_name?) - | KW_SHOW KW_FUNCTIONS showFunctionIdentifier? -> ^(TOK_SHOWFUNCTIONS showFunctionIdentifier?) + | KW_SHOW KW_FUNCTIONS (KW_LIKE showFunctionIdentifier|showFunctionIdentifier)? -> ^(TOK_SHOWFUNCTIONS KW_LIKE? showFunctionIdentifier?) | KW_SHOW KW_PARTITIONS tabName=tableName partitionSpec? -> ^(TOK_SHOWPARTITIONS $tabName partitionSpec?) | KW_SHOW KW_CREATE KW_TABLE tabName=tableName -> ^(TOK_SHOW_CREATETABLE $tabName) | KW_SHOW KW_TABLE KW_EXTENDED ((KW_FROM|KW_IN) db_name=identifier)? KW_LIKE showStmtIdentifier partitionSpec? Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java Thu Oct 30 16:22:33 2014 @@ -57,4 +57,12 @@ public interface HiveSemanticAnalyzerHoo public String getUserName(); public void setUserName(String userName); + + public String getIpAddress(); + + public void setIpAddress(String ipAddress); + + public String getCommand(); + + public void setCommand(String command); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java Thu Oct 30 16:22:33 2014 @@ -33,6 +33,8 @@ public class HiveSemanticAnalyzerHookCon Set<ReadEntity> inputs = null; Set<WriteEntity> outputs = null; private String userName; + private String ipAddress; + private String command; @Override public Hive getHive() throws HiveException { @@ -73,4 +75,24 @@ public class HiveSemanticAnalyzerHookCon public void setUserName(String userName) { this.userName = userName; } + + @Override + public String getIpAddress() { + return ipAddress; + } + + @Override + public void setIpAddress(String ipAddress) { + this.ipAddress = ipAddress; + } + + @Override + public String getCommand() { + return command; + } + + @Override + public void setCommand(String command) { + this.command = command; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Thu Oct 30 16:22:33 2014 @@ -130,6 +130,10 @@ public class QB { return (outer_id == null ? alias : outer_id + ":" + alias); } + public String getAlias() { + return qbp.getAlias(); + } + public QBParseInfo getParseInfo() { return qbp; } @@ -248,6 +252,12 @@ public class QB { return isQuery; } + // to decide whether to rewrite RR of subquery + public boolean isTopLevelSelectStarQuery() { + return !isCTAS() && qbp.isTopLevelSimpleSelectStarQuery(); + } + + // to find target for fetch task conversion optimizer (not allows subqueries) public boolean isSimpleSelectQuery() { return qbp.isSimpleSelectQuery() && aliasToSubq.isEmpty() && !isCTAS() && !qbp.isAnalyzeCommand(); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Thu Oct 30 16:22:33 2014 @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.antlr.runtime.tree.Tree; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec; @@ -449,39 +450,49 @@ public class QBParseInfo { this.outerQueryLimit = outerQueryLimit; } + public boolean isTopLevelSimpleSelectStarQuery() { + if (alias != null || destToSelExpr.size() != 1 || !isSimpleSelectQuery()) { + return false; + } + for (ASTNode selExprs : destToSelExpr.values()) { + if (selExprs.getChildCount() != 1) { + return false; + } + Tree sel = selExprs.getChild(0).getChild(0); + if (sel == null || sel.getType() != HiveParser.TOK_ALLCOLREF) { + return false; + } + } + return true; + } + public boolean isSimpleSelectQuery() { - if (isSubQ || (joinExpr != null) - || (!destToGroupby.isEmpty()) || (!destToClusterby.isEmpty()) - || (!aliasToLateralViews.isEmpty())) { + if (isSubQ || joinExpr != null || !destToOrderby.isEmpty() || !destToSortby.isEmpty() + || !destToGroupby.isEmpty() || !destToClusterby.isEmpty() || !destToDistributeby.isEmpty() + || !aliasToLateralViews.isEmpty() || !destToLateralView.isEmpty()) { return false; } - Iterator<Map.Entry<String, LinkedHashMap<String, ASTNode>>> aggrIter = destToAggregationExprs - .entrySet().iterator(); - while (aggrIter.hasNext()) { - HashMap<String, ASTNode> h = aggrIter.next().getValue(); - if ((h != null) && (!h.isEmpty())) { + for (Map<String, ASTNode> entry : destToAggregationExprs.values()) { + if (entry != null && !entry.isEmpty()) { return false; } } - if (!destToDistinctFuncExprs.isEmpty()) { - Iterator<Map.Entry<String, List<ASTNode>>> distn = destToDistinctFuncExprs - .entrySet().iterator(); - while (distn.hasNext()) { - List<ASTNode> ct = distn.next().getValue(); - if (!ct.isEmpty()) { - return false; - } + for (Map<String, ASTNode> entry : destToWindowingExprs.values()) { + if (entry != null && !entry.isEmpty()) { + return false; + } + } + + for (List<ASTNode> ct : destToDistinctFuncExprs.values()) { + if (!ct.isEmpty()) { + return false; } } - Iterator<Map.Entry<String, ASTNode>> iter = nameToDest.entrySet() - .iterator(); - while (iter.hasNext()) { - Map.Entry<String, ASTNode> entry = iter.next(); - ASTNode v = entry.getValue(); - if (!(((ASTNode)v.getChild(0)).getToken().getType() == HiveParser.TOK_TMP_FILE)) { + for (ASTNode v : nameToDest.values()) { + if (!(v.getChild(0).getType() == HiveParser.TOK_TMP_FILE)) { return false; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Thu Oct 30 16:22:33 2014 @@ -29,7 +29,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.RowSchema; @@ -120,7 +119,11 @@ public class RowResolver implements Seri f_map = new LinkedHashMap<String, ColumnInfo>(); rslvMap.put(tab_alias, f_map); } - f_map.put(col_alias, colInfo); + ColumnInfo oldColInfo = f_map.put(col_alias, colInfo); + if (oldColInfo != null) { + LOG.warn("Duplicate column info for " + tab_alias + "." + col_alias + + " was overwritten in RowResolver map: " + oldColInfo + " by " + colInfo); + } String[] qualifiedAlias = new String[2]; qualifiedAlias[0] = tab_alias; @@ -195,17 +198,6 @@ public class RowResolver implements Seri return ret; } - /** - * check if column name is already exist in RR - */ - public void checkColumn(String tableAlias, String columnAlias) throws SemanticException { - ColumnInfo prev = get(null, columnAlias); - if (prev != null && - (tableAlias == null || !tableAlias.equalsIgnoreCase(prev.getTabAlias()))) { - throw new SemanticException(ErrorMsg.AMBIGUOUS_COLUMN.getMsg(columnAlias)); - } - } - public ArrayList<ColumnInfo> getColumnInfos() { return rowSchema.getSignature(); } @@ -351,40 +343,44 @@ public class RowResolver implements Seri this.expressionMap = expressionMap; } + private static class IntRef { + public int val = 0; + } - // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or - // not? - public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom, - int outputColPos, int numColumns) throws SemanticException { + public static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom, int numColumns) + throws SemanticException { + return add(rrToAddTo, rrToAddFrom, null, numColumns); + } + + // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or not? + private static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom, + IntRef outputColPosRef, int numColumns) throws SemanticException { + boolean hasDuplicates = false; String tabAlias; String colAlias; String[] qualifiedColName; int i = 0; + int outputColPos = outputColPosRef == null ? 0 : outputColPosRef.val; for (ColumnInfo cInfoFrmInput : rrToAddFrom.getRowSchema().getSignature()) { if ( numColumns >= 0 && i == numColumns ) { break; } ColumnInfo newCI = null; - qualifiedColName = rrToAddFrom.getInvRslvMap().get( - cInfoFrmInput.getInternalName()); + String internalName = cInfoFrmInput.getInternalName(); + qualifiedColName = rrToAddFrom.reverseLookup(internalName); tabAlias = qualifiedColName[0]; colAlias = qualifiedColName[1]; newCI = new ColumnInfo(cInfoFrmInput); - newCI.setInternalName(SemanticAnalyzer - .getColumnInternalName(outputColPos)); + newCI.setInternalName(SemanticAnalyzer.getColumnInternalName(outputColPos)); outputColPos++; - if (rrToAddTo.get(tabAlias, colAlias) != null) { - LOG.debug("Found duplicate column alias in RR: " + rrToAddTo.get(tabAlias, colAlias)); - } else { - rrToAddTo.put(tabAlias, colAlias, newCI); - } + boolean isUnique = rrToAddTo.putWithCheck(tabAlias, colAlias, internalName, newCI); + hasDuplicates |= (!isUnique); - qualifiedColName = rrToAddFrom.getAlternateMappings(cInfoFrmInput - .getInternalName()); + qualifiedColName = rrToAddFrom.getAlternateMappings(internalName); if (qualifiedColName != null) { tabAlias = qualifiedColName[0]; colAlias = qualifiedColName[1]; @@ -393,31 +389,73 @@ public class RowResolver implements Seri i++; } - return outputColPos; - } + if (outputColPosRef != null) { + outputColPosRef.val = outputColPos; + } + return !hasDuplicates; + } - public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom, - int outputColPos) throws SemanticException { - return add(rrToAddTo, rrToAddFrom, outputColPos, -1); - } - - /** - * Return a new row resolver that is combination of left RR and right RR. - * The schema will be schema of left, schema of right - * - * @param leftRR - * @param rightRR - * @return - * @throws SemanticException - */ - public static RowResolver getCombinedRR(RowResolver leftRR, - RowResolver rightRR) throws SemanticException { - int outputColPos = 0; - - RowResolver combinedRR = new RowResolver(); - outputColPos = add(combinedRR, leftRR, outputColPos); - outputColPos = add(combinedRR, rightRR, outputColPos); + /** + * Adds column to RR, checking for duplicate columns. Needed because CBO cannot handle the Hive + * behavior of blindly overwriting old mapping in RR and still somehow working after that. + * @return True if mapping was added without duplicates. + */ + public boolean putWithCheck(String tabAlias, String colAlias, + String internalName, ColumnInfo newCI) throws SemanticException { + ColumnInfo existing = get(tabAlias, colAlias); + // Hive adds the same mapping twice... I wish we could fix stuff like that. + if (existing == null) { + put(tabAlias, colAlias, newCI); + return true; + } else if (existing.isSameColumnForRR(newCI)) { + return true; + } + LOG.warn("Found duplicate column alias in RR: " + + existing.toMappingString(tabAlias, colAlias) + " adding " + + newCI.toMappingString(tabAlias, colAlias)); + if (internalName != null) { + existing = get(tabAlias, internalName); + if (existing == null) { + put(tabAlias, internalName, newCI); + return true; + } else if (existing.isSameColumnForRR(newCI)) { + return true; + } + LOG.warn("Failed to use internal name after finding a duplicate: " + + existing.toMappingString(tabAlias, internalName)); + } + return false; + } + + private static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom, + IntRef outputColPosRef) throws SemanticException { + return add(rrToAddTo, rrToAddFrom, outputColPosRef, -1); + } + + public static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom) + throws SemanticException { + return add(rrToAddTo, rrToAddFrom, null, -1); + } - return combinedRR; - } + /** + * Return a new row resolver that is combination of left RR and right RR. + * The schema will be schema of left, schema of right + * + * @param leftRR + * @param rightRR + * @return + * @throws SemanticException + */ + public static RowResolver getCombinedRR(RowResolver leftRR, + RowResolver rightRR) throws SemanticException { + RowResolver combinedRR = new RowResolver(); + IntRef outputColPos = new IntRef(); + if (!add(combinedRR, leftRR, outputColPos)) { + LOG.warn("Duplicates detected when adding columns to RR: see previous message"); + } + if (!add(combinedRR, rightRR, outputColPos)) { + LOG.warn("Duplicates detected when adding columns to RR: see previous message"); + } + return combinedRR; + } }