Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateString.java Wed Aug 13 02:28:54 2014 @@ -23,11 +23,15 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.io.Text; -import java.sql.Date; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.text.ParseException; public class VectorUDFDateString extends StringUnaryUDF { private static final long serialVersionUID = 1L; + private transient static SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); + private static final Log LOG = LogFactory.getLog( VectorUDFDateString.class.getName()); @@ -41,13 +45,10 @@ public class VectorUDFDateString extends return null; } try { - Date date = Date.valueOf(s.toString()); - t.set(date.toString()); + Date date = formatter.parse(s.toString()); + t.set(formatter.format(date)); return t; - } catch (IllegalArgumentException e) { - if (LOG.isDebugEnabled()) { - LOG.info("VectorUDFDateString passed bad string for Date.valueOf '" + s.toString() + "'"); - } + } catch (ParseException e) { return null; } }
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java Wed Aug 13 02:28:54 2014 @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; @@ -46,12 +45,9 @@ import org.apache.hadoop.hive.ql.plan.Pa */ public class AggregateIndexHandler extends CompactIndexHandler { - private static Index index = null; - @Override - public void analyzeIndexDefinition(Table baseTable, Index idx, + public void analyzeIndexDefinition(Table baseTable, Index index, Table indexTable) throws HiveException { - index = idx; StorageDescriptor storageDesc = index.getSd(); if (this.usesIndexTable() && indexTable != null) { StorageDescriptor indexTableSd = storageDesc.deepCopy(); @@ -92,10 +88,11 @@ public class AggregateIndexHandler exten @Override protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs, - List<FieldSchema> indexField, boolean partitioned, + Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, PartitionDesc baseTablePartDesc, String baseTableName, String dbName) { + List<FieldSchema> indexField = index.getSd().getCols(); String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); //form a new insert overwrite query. Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java Wed Aug 13 02:28:54 2014 @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -64,7 +65,7 @@ public abstract class TableBasedIndexHan if (!baseTbl.isPartitioned()) { // the table does not have any partition, then create index for the // whole table - Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), false, + Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false, new PartitionDesc(desc, null), indexTbl.getTableName(), new PartitionDesc(Utilities.getTableDesc(baseTbl), null), baseTbl.getTableName(), indexTbl.getDbName()); @@ -88,7 +89,7 @@ public abstract class TableBasedIndexHan "Partitions of base table and index table are inconsistent."); } // for each partition, spawn a map reduce task. - Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), true, + Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true, new PartitionDesc(indexPart), indexTbl.getTableName(), new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName()); indexBuilderTasks.add(indexBuilder); @@ -100,10 +101,20 @@ public abstract class TableBasedIndexHan } } - abstract protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs, + protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs, + Index index, boolean partitioned, + PartitionDesc indexTblPartDesc, String indexTableName, + PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), + partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName); + } + + protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs, List<FieldSchema> indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException; + PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + return null; + } protected void setStatsDir(HiveConf builderConf) { String statsDir; Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Aug 13 02:28:54 2014 @@ -910,8 +910,11 @@ class RecordReaderImpl implements Record private InStream stream; private IntegerReader lengths = null; + private final LongColumnVector scratchlcv; + BinaryTreeReader(Path path, int columnId, Configuration conf) { super(path, columnId, conf); + scratchlcv = new LongColumnVector(); } @Override @@ -969,8 +972,18 @@ class RecordReaderImpl implements Record @Override Object nextVector(Object previousVector, long batchSize) throws IOException { - throw new UnsupportedOperationException( - "NextBatch is not supported operation for Binary type"); + BytesColumnVector result = null; + if (previousVector == null) { + result = new BytesColumnVector(); + } else { + result = (BytesColumnVector) previousVector; + } + + // Read present/isNull stream + super.nextVector(result, batchSize); + + BytesColumnVectorUtil.setRefToOrcByteArrays(stream, lengths, scratchlcv, result, batchSize); + return result; } @Override @@ -1361,6 +1374,66 @@ class RecordReaderImpl implements Record } } + private static class BytesColumnVectorUtil { + // This method has the common code for reading in bytes into a BytesColumnVector. + // It is used by the BINARY, STRING, CHAR, VARCHAR types. + public static void setRefToOrcByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv, + BytesColumnVector result, long batchSize) throws IOException { + + // Read lengths + scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here... + lengths.nextVector(scratchlcv, batchSize); + int totalLength = 0; + if (!scratchlcv.isRepeating) { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + totalLength += (int) scratchlcv.vector[i]; + } + } + } else { + if (!scratchlcv.isNull[0]) { + totalLength = (int) (batchSize * scratchlcv.vector[0]); + } + } + + // Read all the strings for this batch + byte[] allBytes = new byte[totalLength]; + int offset = 0; + int len = totalLength; + while (len > 0) { + int bytesRead = stream.read(allBytes, offset, len); + if (bytesRead < 0) { + throw new EOFException("Can't finish byte read from " + stream); + } + len -= bytesRead; + offset += bytesRead; + } + + // Too expensive to figure out 'repeating' by comparisons. + result.isRepeating = false; + offset = 0; + if (!scratchlcv.isRepeating) { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]); + offset += scratchlcv.vector[i]; + } else { + result.setRef(i, allBytes, 0, 0); + } + } + } else { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]); + offset += scratchlcv.vector[0]; + } else { + result.setRef(i, allBytes, 0, 0); + } + } + } + } + } + /** * A reader for string columns that are direct encoded in the current * stripe. @@ -1443,57 +1516,7 @@ class RecordReaderImpl implements Record // Read present/isNull stream super.nextVector(result, batchSize); - // Read lengths - scratchlcv.isNull = result.isNull; - lengths.nextVector(scratchlcv, batchSize); - int totalLength = 0; - if (!scratchlcv.isRepeating) { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - totalLength += (int) scratchlcv.vector[i]; - } - } - } else { - if (!scratchlcv.isNull[0]) { - totalLength = (int) (batchSize * scratchlcv.vector[0]); - } - } - - //Read all the strings for this batch - byte[] allBytes = new byte[totalLength]; - int offset = 0; - int len = totalLength; - while (len > 0) { - int bytesRead = stream.read(allBytes, offset, len); - if (bytesRead < 0) { - throw new EOFException("Can't finish byte read from " + stream); - } - len -= bytesRead; - offset += bytesRead; - } - - // Too expensive to figure out 'repeating' by comparisons. - result.isRepeating = false; - offset = 0; - if (!scratchlcv.isRepeating) { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]); - offset += scratchlcv.vector[i]; - } else { - result.setRef(i, allBytes, 0, 0); - } - } - } else { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]); - offset += scratchlcv.vector[0]; - } else { - result.setRef(i, allBytes, 0, 0); - } - } - } + BytesColumnVectorUtil.setRefToOrcByteArrays(stream, lengths, scratchlcv, result, batchSize); return result; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Aug 13 02:28:54 2014 @@ -409,6 +409,12 @@ public class Hive { } } + public void alterIndex(String baseTableName, String indexName, Index newIdx) + throws InvalidOperationException, HiveException { + String[] names = Utilities.getDbTableName(baseTableName); + alterIndex(names[0], names[1], indexName, newIdx); + } + /** * Updates the existing index metadata with the new metadata. * @@ -667,17 +673,16 @@ public class Hive { throws HiveException { try { - String dbName = SessionState.get().getCurrentDatabase(); Index old_index = null; try { - old_index = getIndex(dbName, tableName, indexName); + old_index = getIndex(tableName, indexName); } catch (Exception e) { } if (old_index != null) { - throw new HiveException("Index " + indexName + " already exists on table " + tableName + ", db=" + dbName); + throw new HiveException("Index " + indexName + " already exists on table " + tableName); } - org.apache.hadoop.hive.metastore.api.Table baseTbl = getMSC().getTable(dbName, tableName); + org.apache.hadoop.hive.metastore.api.Table baseTbl = getTable(tableName).getTTable(); if (baseTbl.getTableType() == TableType.VIRTUAL_VIEW.toString()) { throw new HiveException("tableName="+ tableName +" is a VIRTUAL VIEW. Index on VIRTUAL VIEW is not supported."); } @@ -686,17 +691,13 @@ public class Hive { + " is a TEMPORARY TABLE. Index on TEMPORARY TABLE is not supported."); } - if (indexTblName == null) { - indexTblName = MetaStoreUtils.getIndexTableName(dbName, tableName, indexName); - } else { - org.apache.hadoop.hive.metastore.api.Table temp = null; - try { - temp = getMSC().getTable(dbName, indexTblName); - } catch (Exception e) { - } - if (temp != null) { - throw new HiveException("Table name " + indexTblName + " already exists. Choose another name."); - } + org.apache.hadoop.hive.metastore.api.Table temp = null; + try { + temp = getTable(indexTblName).getTTable(); + } catch (Exception e) { + } + if (temp != null) { + throw new HiveException("Table name " + indexTblName + " already exists. Choose another name."); } org.apache.hadoop.hive.metastore.api.StorageDescriptor storageDescriptor = baseTbl.getSd().deepCopy(); @@ -774,7 +775,9 @@ public class Hive { HiveIndexHandler indexHandler = HiveUtils.getIndexHandler(this.getConf(), indexHandlerClass); if (indexHandler.usesIndexTable()) { - tt = new org.apache.hadoop.hive.ql.metadata.Table(dbName, indexTblName).getTTable(); + String idname = Utilities.getDatabaseName(indexTblName); + String itname = Utilities.getTableName(indexTblName); + tt = new org.apache.hadoop.hive.ql.metadata.Table(idname, itname).getTTable(); List<FieldSchema> partKeys = baseTbl.getPartitionKeys(); tt.setPartitionKeys(partKeys); tt.setTableType(TableType.INDEX_TABLE.toString()); @@ -798,7 +801,9 @@ public class Hive { throw new RuntimeException("Please specify deferred rebuild using \" WITH DEFERRED REBUILD \"."); } - Index indexDesc = new Index(indexName, indexHandlerClass, dbName, tableName, time, time, indexTblName, + String tdname = Utilities.getDatabaseName(tableName); + String ttname = Utilities.getTableName(tableName); + Index indexDesc = new Index(indexName, indexHandlerClass, tdname, ttname, time, time, indexTblName, storageDescriptor, params, deferredRebuild); if (indexComment != null) { indexDesc.getParameters().put("comment", indexComment); @@ -818,19 +823,6 @@ public class Hive { } } - public Index getIndex(String qualifiedIndexName) throws HiveException { - String[] names = getQualifiedNames(qualifiedIndexName); - switch (names.length) { - case 3: - return getIndex(names[0], names[1], names[2]); - case 2: - return getIndex(SessionState.get().getCurrentDatabase(), - names[0], names[1]); - default: - throw new HiveException("Invalid index name:" + qualifiedIndexName); - } - } - public Index getIndex(String baseTableName, String indexName) throws HiveException { String[] names = Utilities.getDbTableName(baseTableName); return this.getIndex(names[0], names[1], indexName); @@ -845,6 +837,11 @@ public class Hive { } } + public boolean dropIndex(String baseTableName, String index_name, boolean deleteData) throws HiveException { + String[] names = Utilities.getDbTableName(baseTableName); + return dropIndex(names[0], names[1], index_name, deleteData); + } + public boolean dropIndex(String db_name, String tbl_name, String index_name, boolean deleteData) throws HiveException { try { return getMSC().dropIndex(db_name, tbl_name, index_name, deleteData); @@ -1242,7 +1239,7 @@ public class Hive { */ FileSystem oldPartPathFS = oldPartPath.getFileSystem(getConf()); FileSystem loadPathFS = loadPath.getFileSystem(getConf()); - if (oldPartPathFS.equals(loadPathFS)) { + if (FileUtils.equalsFileSystem(oldPartPathFS,loadPathFS)) { newPartPath = oldPartPath; } } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Wed Aug 13 02:28:54 2014 @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.exec.Ta import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; @@ -267,15 +266,7 @@ abstract public class AbstractBucketJoin Table tbl = topToTable.get(tso); if (tbl.isPartitioned()) { - PrunedPartitionList prunedParts; - try { - prunedParts = pGraphContext.getPrunedPartitions(alias, tso); - } catch (HiveException e) { - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); - throw new SemanticException(e.getMessage(), e); - } + PrunedPartitionList prunedParts = pGraphContext.getPrunedPartitions(alias, tso); List<Partition> partitions = prunedParts.getNotDeniedPartns(); // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number) if (partitions.isEmpty()) { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Wed Aug 13 02:28:54 2014 @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.exec.Ta import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; @@ -316,13 +315,7 @@ abstract public class AbstractSMBJoinPro Table tbl = topToTable.get(tso); if (tbl.isPartitioned()) { - PrunedPartitionList prunedParts; - try { - prunedParts = pGraphContext.getPrunedPartitions(alias, tso); - } catch (HiveException e) { - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); - throw new SemanticException(e.getMessage(), e); - } + PrunedPartitionList prunedParts = pGraphContext.getPrunedPartitions(alias, tso); List<Partition> partitions = prunedParts.getNotDeniedPartns(); // Populate the names and order of columns for the first partition of the // first table Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Wed Aug 13 02:28:54 2014 @@ -497,7 +497,8 @@ public class BucketingSortingReduceSinkO } if (srcTable.isPartitioned()) { - PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(ts); + PrunedPartitionList prunedParts = + pGraphContext.getPrunedPartitions(srcTable.getTableName(), ts); List<Partition> partitions = prunedParts.getNotDeniedPartns(); // Support for dynamic partitions can be added later Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Wed Aug 13 02:28:54 2014 @@ -372,52 +372,57 @@ public final class ColumnPrunerProcFacto cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd, cols); - List<Integer> neededColumnIds = new ArrayList<Integer>(); - List<String> neededColumnNames = new ArrayList<String>(); - List<String> referencedColumnNames = new ArrayList<String>(); RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver(); - TableScanDesc desc = scanOp.getConf(); - List<VirtualColumn> virtualCols = desc.getVirtualCols(); - List<VirtualColumn> newVirtualCols = new ArrayList<VirtualColumn>(); - - // add virtual columns for ANALYZE TABLE - if(scanOp.getConf().isGatherStats()) { - cols.add(VirtualColumn.RAWDATASIZE.getName()); - } + setupNeededColumns(scanOp, inputRR, cols); + return null; + } + } - for (String column : cols) { - String[] tabCol = inputRR.reverseLookup(column); - if (tabCol == null) { - continue; - } - referencedColumnNames.add(column); - ColumnInfo colInfo = inputRR.get(tabCol[0], tabCol[1]); - if (colInfo.getIsVirtualCol()) { - // part is also a virtual column, but part col should not in this - // list. - for (int j = 0; j < virtualCols.size(); j++) { - VirtualColumn vc = virtualCols.get(j); - if (vc.getName().equals(colInfo.getInternalName())) { - newVirtualCols.add(vc); - } + public static void setupNeededColumns(TableScanOperator scanOp, RowResolver inputRR, + List<String> cols) throws SemanticException { + List<Integer> neededColumnIds = new ArrayList<Integer>(); + List<String> neededColumnNames = new ArrayList<String>(); + List<String> referencedColumnNames = new ArrayList<String>(); + TableScanDesc desc = scanOp.getConf(); + List<VirtualColumn> virtualCols = desc.getVirtualCols(); + List<VirtualColumn> newVirtualCols = new ArrayList<VirtualColumn>(); + + // add virtual columns for ANALYZE TABLE + if(scanOp.getConf().isGatherStats()) { + cols.add(VirtualColumn.RAWDATASIZE.getName()); + } + + for (String column : cols) { + String[] tabCol = inputRR.reverseLookup(column); + if (tabCol == null) { + continue; + } + referencedColumnNames.add(column); + ColumnInfo colInfo = inputRR.get(tabCol[0], tabCol[1]); + if (colInfo.getIsVirtualCol()) { + // part is also a virtual column, but part col should not in this + // list. + for (int j = 0; j < virtualCols.size(); j++) { + VirtualColumn vc = virtualCols.get(j); + if (vc.getName().equals(colInfo.getInternalName())) { + newVirtualCols.add(vc); } - //no need to pass virtual columns to reader. - continue; - } - int position = inputRR.getPosition(column); - if (position >= 0) { - // get the needed columns by id and name - neededColumnIds.add(position); - neededColumnNames.add(column); } + //no need to pass virtual columns to reader. + continue; + } + int position = inputRR.getPosition(column); + if (position >= 0) { + // get the needed columns by id and name + neededColumnIds.add(position); + neededColumnNames.add(column); } - - desc.setVirtualCols(newVirtualCols); - scanOp.setNeededColumnIDs(neededColumnIds); - scanOp.setNeededColumns(neededColumnNames); - scanOp.setReferencedColumns(referencedColumnNames); - return null; } + + desc.setVirtualCols(newVirtualCols); + scanOp.setNeededColumnIDs(neededColumnIds); + scanOp.setNeededColumns(neededColumnNames); + scanOp.setReferencedColumns(referencedColumnNames); } /** Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java Wed Aug 13 02:28:54 2014 @@ -77,12 +77,6 @@ public class ConstantPropagate implement */ @Override public ParseContext transform(ParseContext pactx) throws SemanticException { - if (pactx.getConf().getBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED)) { - // Constant propagate is currently conflict with vectorizer, disabling constant propagate - // if the later is enabled. - return pactx; - } - pGraphContext = pactx; opToParseCtxMap = pGraphContext.getOpParseCtx(); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Wed Aug 13 02:28:54 2014 @@ -348,9 +348,9 @@ public final class ConstantPropagateProc ExprNodeDesc childExpr = newExprs.get(i); if (childExpr instanceof ExprNodeConstantDesc) { ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr; - if (c.getValue() == Boolean.TRUE) { + if (Boolean.TRUE.equals(c.getValue())) { - // if true, prune it + // if true, prune it return newExprs.get(Math.abs(i - 1)); } else { @@ -366,7 +366,7 @@ public final class ConstantPropagateProc ExprNodeDesc childExpr = newExprs.get(i); if (childExpr instanceof ExprNodeConstantDesc) { ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr; - if (c.getValue() == Boolean.FALSE) { + if (Boolean.FALSE.equals(c.getValue())) { // if false, prune it return newExprs.get(Math.abs(i - 1)); @@ -565,10 +565,10 @@ public final class ConstantPropagateProc ExprNodeDesc newCondn = foldExpr(condn, constants, cppCtx, op, 0, true); if (newCondn instanceof ExprNodeConstantDesc) { ExprNodeConstantDesc c = (ExprNodeConstantDesc) newCondn; - if (c.getValue() == Boolean.TRUE) { + if (Boolean.TRUE.equals(c.getValue())) { cppCtx.addOpToDelete(op); LOG.debug("Filter expression " + condn + " holds true. Will delete it."); - } else if (c.getValue() == Boolean.FALSE) { + } else if (Boolean.FALSE.equals(c.getValue())) { LOG.warn("Filter expression " + condn + " holds false!"); } } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Wed Aug 13 02:28:54 2014 @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; @@ -61,7 +60,6 @@ import org.apache.hadoop.hive.ql.plan.Gr import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; -import org.apache.hadoop.util.StringUtils; /** * This transformation does group by optimization. If the grouping key is a superset @@ -388,13 +386,8 @@ public class GroupByOptimizer implements List<String> bucketCols = table.getBucketCols(); return matchBucketSortCols(groupByCols, bucketCols, sortCols); } else { - PrunedPartitionList partsList; - try { - partsList = pGraphContext.getPrunedPartitions(table.getTableName(), tableScanOp); - } catch (HiveException e) { - LOG.error(StringUtils.stringifyException(e)); - throw new SemanticException(e.getMessage(), e); - } + PrunedPartitionList partsList = + pGraphContext.getPrunedPartitions(table.getTableName(), tableScanOp); List<Partition> notDeniedPartns = partsList.getNotDeniedPartns(); Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java Wed Aug 13 02:28:54 2014 @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -36,6 +35,7 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask; @@ -57,7 +57,6 @@ import org.apache.hadoop.hive.ql.parse.S public final class IndexUtils { private static final Log LOG = LogFactory.getLog(IndexWhereProcessor.class.getName()); - private static final Map<Index, Table> indexToIndexTable = new HashMap<Index, Table>(); private IndexUtils(){ } @@ -71,9 +70,7 @@ public final class IndexUtils { * @throws HiveException */ public static Set<Partition> checkPartitionsCoveredByIndex(TableScanOperator tableScan, - ParseContext pctx, - Map<Table, List<Index>> indexes) - throws HiveException { + ParseContext pctx, List<Index> indexes) throws HiveException { Hive hive = Hive.get(pctx.getConf()); // make sure each partition exists on the index table PrunedPartitionList queryPartitionList = pctx.getOpToPartList().get(tableScan); @@ -83,7 +80,6 @@ public final class IndexUtils { } for (Partition part : queryPartitions) { - List<Table> sourceIndexTables = getIndexTables(hive, part, indexes); if (!containsPartition(hive, part, indexes)) { return null; // problem if it doesn't contain the partition } @@ -93,63 +89,24 @@ public final class IndexUtils { } /** - * return index tables associated with a given base table - */ - private List<Table> getIndexTables(Hive hive, Table table, - Map<Table, List<Index>> indexes) throws - HiveException { - List<Table> indexTables = new ArrayList<Table>(); - if (indexes == null || indexes.get(table) == null) { - return indexTables; - } - for (Index index : indexes.get(table)) { - Table indexTable = hive.getTable(index.getIndexTableName()); - indexToIndexTable.put(index, indexTable); - indexTables.add(indexTable); - } - return indexTables; - } - - /** - * return index tables associated with the base table of the partition - */ - private static List<Table> getIndexTables(Hive hive, Partition part, - Map<Table, List<Index>> indexes) throws HiveException { - List<Table> indexTables = new ArrayList<Table>(); - Table partitionedTable = part.getTable(); - if (indexes == null || indexes.get(partitionedTable) == null) { - return indexTables; - } - for (Index index : indexes.get(partitionedTable)) { - Table indexTable = hive.getTable(index.getIndexTableName()); - indexToIndexTable.put(index, indexTable); - indexTables.add(indexTable); - } - return indexTables; - } - - /** * check that every index table contains the given partition and is fresh */ - private static boolean containsPartition(Hive hive, Partition part, - Map<Table, List<Index>> indexes) - throws HiveException { + private static boolean containsPartition(Hive hive, Partition part, List<Index> indexes) + throws HiveException { HashMap<String, String> partSpec = part.getSpec(); - - if (indexes == null || indexes.get(part.getTable()) == null) { - return false; - } - if (partSpec.isEmpty()) { // empty specs come from non-partitioned tables - return isIndexTableFresh(hive, indexes.get(part.getTable()), part.getTable()); + return isIndexTableFresh(hive, indexes, part.getTable()); } - for (Index index : indexes.get(part.getTable())) { - Table indexTable = indexToIndexTable.get(index); + for (Index index : indexes) { + // index.getDbName() is used as a default database, which is database of target table, + // if index.getIndexTableName() does not contain database name + String[] qualified = Utilities.getDbTableName(index.getDbName(), index.getIndexTableName()); + Table indexTable = hive.getTable(qualified[0], qualified[1]); // get partitions that match the spec - List<Partition> matchingPartitions = hive.getPartitions(indexTable, partSpec); - if (matchingPartitions == null || matchingPartitions.size() == 0) { + Partition matchingPartition = hive.getPartition(indexTable, partSpec, false); + if (matchingPartition == null) { LOG.info("Index table " + indexTable + "did not contain built partition that matched " + partSpec); return false; } else if (!isIndexPartitionFresh(hive, index, part)) { @@ -160,7 +117,7 @@ public final class IndexUtils { } /** - * Check the index partitions on a parttioned table exist and are fresh + * Check the index partitions on a partitioned table exist and are fresh */ private static boolean isIndexPartitionFresh(Hive hive, Index index, Partition part) throws HiveException { @@ -187,7 +144,7 @@ public final class IndexUtils { } /** - * Check that the indexes on the unpartioned table exist and are fresh + * Check that the indexes on the un-partitioned table exist and are fresh */ private static boolean isIndexTableFresh(Hive hive, List<Index> indexes, Table src) throws HiveException { @@ -227,8 +184,8 @@ public final class IndexUtils { public static List<Index> getIndexes(Table baseTableMetaData, List<String> matchIndexTypes) throws SemanticException { List<Index> matchingIndexes = new ArrayList<Index>(); - List<Index> indexesOnTable = null; + List<Index> indexesOnTable; try { indexesOnTable = baseTableMetaData.getAllIndexes((short) -1); // get all indexes } catch (HiveException e) { Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Wed Aug 13 02:28:54 2014 @@ -26,6 +26,7 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -33,21 +34,28 @@ import org.apache.hadoop.hive.ql.exec.Op import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.GenTezProcContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.stats.StatsUtils; public class ReduceSinkMapJoinProc implements NodeProcessor { @@ -111,18 +119,59 @@ public class ReduceSinkMapJoinProc imple if (pos == -1) { throw new SemanticException("Cannot find position of parent in mapjoin"); } - LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName()); - mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName()); + MapJoinDesc joinConf = mapJoinOp.getConf(); + long keyCount = Long.MAX_VALUE, rowCount = Long.MAX_VALUE, bucketCount = 1; + Statistics stats = parentRS.getStatistics(); + if (stats != null) { + keyCount = rowCount = stats.getNumRows(); + if (keyCount <= 0) { + keyCount = rowCount = Long.MAX_VALUE; + } + ArrayList<String> keyCols = parentRS.getConf().getOutputKeyColumnNames(); + if (keyCols != null && !keyCols.isEmpty()) { + // See if we can arrive at a smaller number using distinct stats from key columns. + long maxKeyCount = 1; + String prefix = Utilities.ReduceField.KEY.toString(); + for (String keyCol : keyCols) { + ExprNodeDesc realCol = parentRS.getColumnExprMap().get(prefix + "." + keyCol); + ColStatistics cs = StatsUtils.getColStatisticsFromExpression(null, stats, realCol); + if (cs == null || cs.getCountDistint() <= 0) { + maxKeyCount = Long.MAX_VALUE; + break; + } + maxKeyCount *= cs.getCountDistint(); + if (maxKeyCount >= keyCount) { + break; + } + } + keyCount = Math.min(maxKeyCount, keyCount); + } + if (joinConf.isBucketMapJoin()) { + OpTraits opTraits = mapJoinOp.getOpTraits(); + bucketCount = (opTraits == null) ? -1 : opTraits.getNumBuckets(); + if (bucketCount > 0) { + // We cannot obtain a better estimate without CustomPartitionVertex providing it + // to us somehow; in which case using statistics would be completely unnecessary. + keyCount /= bucketCount; + } + } + } + LOG.info("Mapjoin " + mapJoinOp + ", pos: " + pos + " --> " + parentWork.getName() + " (" + + keyCount + " keys estimated from " + rowCount + " rows, " + bucketCount + " buckets)"); + joinConf.getParentToInput().put(pos, parentWork.getName()); + if (keyCount != Long.MAX_VALUE) { + joinConf.getParentKeyCounts().put(pos, keyCount); + } int numBuckets = -1; EdgeType edgeType = EdgeType.BROADCAST_EDGE; - if (mapJoinOp.getConf().isBucketMapJoin()) { + if (joinConf.isBucketMapJoin()) { // disable auto parallelism for bucket map joins parentRS.getConf().setAutoParallel(false); - numBuckets = (Integer) mapJoinOp.getConf().getBigTableBucketNumMapping().values().toArray()[0]; - if (mapJoinOp.getConf().getCustomBucketMapJoin()) { + numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0]; + if (joinConf.getCustomBucketMapJoin()) { edgeType = EdgeType.CUSTOM_EDGE; } else { edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java Wed Aug 13 02:28:54 2014 @@ -30,10 +30,9 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.metastore.api.Index; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; -import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; @@ -45,7 +44,6 @@ import org.apache.hadoop.hive.ql.lib.Rul import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** * RewriteCanApplyCtx class stores the context for the {@link RewriteCanApplyProcFactory} @@ -84,7 +82,9 @@ public final class RewriteCanApplyCtx im private Set<String> aggFuncColList = new LinkedHashSet<String>(); private final ParseContext parseContext; + private String alias; private String baseTableName; + private String indexTableName; private String aggFunction; void resetCanApplyCtx(){ @@ -230,6 +230,14 @@ public final class RewriteCanApplyCtx im this.aggFuncCnt = aggFuncCnt; } + public String getAlias() { + return alias; + } + + public void setAlias(String alias) { + this.alias = alias; + } + public String getBaseTableName() { return baseTableName; } @@ -238,10 +246,26 @@ public final class RewriteCanApplyCtx im this.baseTableName = baseTableName; } + public String getIndexTableName() { + return indexTableName; + } + + public void setIndexTableName(String indexTableName) { + this.indexTableName = indexTableName; + } + public ParseContext getParseContext() { return parseContext; } + public Set<String> getAllColumns() { + Set<String> allColumns = new LinkedHashSet<String>(selectColumnsList); + allColumns.addAll(predicateColumnsList); + allColumns.addAll(gbKeyNameList); + allColumns.addAll(aggFuncColList); + return allColumns; + } + /** * This method walks all the nodes starting from topOp TableScanOperator node @@ -255,15 +279,13 @@ public final class RewriteCanApplyCtx im * @param topOp * @throws SemanticException */ - void populateRewriteVars(Operator<? extends OperatorDesc> topOp) + void populateRewriteVars(TableScanOperator topOp) throws SemanticException{ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); opRules.put(new RuleRegExp("R1", FilterOperator.getOperatorName() + "%"), - RewriteCanApplyProcFactory.canApplyOnFilterOperator()); + RewriteCanApplyProcFactory.canApplyOnFilterOperator(topOp)); opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + "%"), - RewriteCanApplyProcFactory.canApplyOnGroupByOperator()); - opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + "%"), - RewriteCanApplyProcFactory.canApplyOnSelectOperator()); + RewriteCanApplyProcFactory.canApplyOnGroupByOperator(topOp)); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -364,5 +386,4 @@ public final class RewriteCanApplyCtx im } return true; } - } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java Wed Aug 13 02:28:54 2014 @@ -18,19 +18,9 @@ package org.apache.hadoop.hive.ql.optimizer.index; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; - -import org.apache.hadoop.hive.ql.exec.ColumnInfo; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.RowSchema; -import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -39,10 +29,13 @@ import org.apache.hadoop.hive.ql.plan.Ag import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +import java.util.List; +import java.util.Stack; /** * Factory of methods used by {@link RewriteGBUsingIndex} @@ -50,43 +43,46 @@ import org.apache.hadoop.hive.ql.plan.Op * */ public final class RewriteCanApplyProcFactory { - private static RewriteCanApplyCtx canApplyCtx = null; - - private RewriteCanApplyProcFactory(){ - //this prevents the class from getting instantiated - } /** * Check for conditions in FilterOperator that do not meet rewrite criteria. */ private static class CheckFilterProc implements NodeProcessor { + + private TableScanOperator topOp; + + public CheckFilterProc(TableScanOperator topOp) { + this.topOp = topOp; + } + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { FilterOperator operator = (FilterOperator)nd; - canApplyCtx = (RewriteCanApplyCtx)ctx; - FilterDesc conf = (FilterDesc)operator.getConf(); + RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx; + FilterDesc conf = operator.getConf(); //The filter operator should have a predicate of ExprNodeGenericFuncDesc type. //This represents the comparison operator - ExprNodeGenericFuncDesc oldengfd = (ExprNodeGenericFuncDesc) conf.getPredicate(); + ExprNodeDesc oldengfd = conf.getPredicate(); if(oldengfd == null){ canApplyCtx.setWhrClauseColsFetchException(true); + return null; } - //The predicate should have valid left and right columns - List<String> colList = oldengfd.getCols(); - if(colList == null || colList.size() == 0){ + ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(oldengfd, operator, topOp); + if (backtrack == null) { canApplyCtx.setWhrClauseColsFetchException(true); + return null; } //Add the predicate columns to RewriteCanApplyCtx's predColRefs list to check later //if index keys contain all filter predicate columns and vice-a-versa - for (String col : colList) { + for (String col : backtrack.getCols()) { canApplyCtx.getPredicateColumnsList().add(col); } return null; } } - public static CheckFilterProc canApplyOnFilterOperator() { - return new CheckFilterProc(); + public static CheckFilterProc canApplyOnFilterOperator(TableScanOperator topOp) { + return new CheckFilterProc(topOp); } /** @@ -95,10 +91,16 @@ public final class RewriteCanApplyProcFa */ private static class CheckGroupByProc implements NodeProcessor { + private TableScanOperator topOp; + + public CheckGroupByProc(TableScanOperator topOp) { + this.topOp = topOp; + } + public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { GroupByOperator operator = (GroupByOperator)nd; - canApplyCtx = (RewriteCanApplyCtx)ctx; + RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx; //for each group-by clause in query, only one GroupByOperator of the //GBY-RS-GBY sequence is stored in getGroupOpToInputTables //we need to process only this operator @@ -107,7 +109,7 @@ public final class RewriteCanApplyProcFa !canApplyCtx.isQueryHasGroupBy()){ canApplyCtx.setQueryHasGroupBy(true); - GroupByDesc conf = (GroupByDesc) operator.getConf(); + GroupByDesc conf = operator.getConf(); List<AggregationDesc> aggrList = conf.getAggregators(); if(aggrList != null && aggrList.size() > 0){ for (AggregationDesc aggregationDesc : aggrList) { @@ -119,40 +121,39 @@ public final class RewriteCanApplyProcFa String aggFunc = aggregationDesc.getGenericUDAFName(); if(!("count".equals(aggFunc))){ canApplyCtx.setAggFuncIsNotCount(true); - }else{ - List<ExprNodeDesc> para = aggregationDesc.getParameters(); - //for a valid aggregation, it needs to have non-null parameter list - if(para == null){ - canApplyCtx.setAggFuncColsFetchException(true); - }else if(para.size() == 0){ - //count(*) case - canApplyCtx.setCountOnAllCols(true); - canApplyCtx.setAggFunction("_count_of_all"); - }else{ - assert para.size()==1; - for(int i=0; i< para.size(); i++){ - ExprNodeDesc expr = para.get(i); - if(expr instanceof ExprNodeColumnDesc){ - //Add the columns to RewriteCanApplyCtx's selectColumnsList list - //to check later if index keys contain all select clause columns - //and vice-a-versa. We get the select column 'actual' names only here - //if we have a agg func along with group-by - //SelectOperator has internal names in its colList data structure - canApplyCtx.getSelectColumnsList().add( - ((ExprNodeColumnDesc) expr).getColumn()); - //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later - //if columns contained in agg func are index key columns - canApplyCtx.getAggFuncColList().add( - ((ExprNodeColumnDesc) expr).getColumn()); - canApplyCtx.setAggFunction("_count_of_" + - ((ExprNodeColumnDesc) expr).getColumn() + ""); - }else if(expr instanceof ExprNodeConstantDesc){ - //count(1) case - canApplyCtx.setCountOfOne(true); - canApplyCtx.setAggFunction("_count_of_1"); - } - } + return false; + } + List<ExprNodeDesc> para = aggregationDesc.getParameters(); + //for a valid aggregation, it needs to have non-null parameter list + if (para == null) { + canApplyCtx.setAggFuncColsFetchException(true); + } else if (para.size() == 0) { + //count(*) case + canApplyCtx.setCountOnAllCols(true); + canApplyCtx.setAggFunction("_count_of_all"); + } else if (para.size() == 1) { + ExprNodeDesc expr = ExprNodeDescUtils.backtrack(para.get(0), operator, topOp); + if (expr instanceof ExprNodeColumnDesc){ + //Add the columns to RewriteCanApplyCtx's selectColumnsList list + //to check later if index keys contain all select clause columns + //and vice-a-versa. We get the select column 'actual' names only here + //if we have a agg func along with group-by + //SelectOperator has internal names in its colList data structure + canApplyCtx.getSelectColumnsList().add( + ((ExprNodeColumnDesc) expr).getColumn()); + //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later + //if columns contained in agg func are index key columns + canApplyCtx.getAggFuncColList().add( + ((ExprNodeColumnDesc) expr).getColumn()); + canApplyCtx.setAggFunction("_count_of_" + + ((ExprNodeColumnDesc) expr).getColumn() + ""); + } else if(expr instanceof ExprNodeConstantDesc) { + //count(1) case + canApplyCtx.setCountOfOne(true); + canApplyCtx.setAggFunction("_count_of_1"); } + } else { + throw new SemanticException("Invalid number of arguments for count"); } } } @@ -163,13 +164,13 @@ public final class RewriteCanApplyProcFa canApplyCtx.setGbyKeysFetchException(true); } for (ExprNodeDesc expr : keyList) { - checkExpression(expr); + checkExpression(canApplyCtx, expr); } } return null; } - private void checkExpression(ExprNodeDesc expr){ + private void checkExpression(RewriteCanApplyCtx canApplyCtx, ExprNodeDesc expr){ if(expr instanceof ExprNodeColumnDesc){ //Add the group-by keys to RewriteCanApplyCtx's gbKeyNameList list to check later //if all keys are from index columns @@ -182,59 +183,14 @@ public final class RewriteCanApplyProcFa canApplyCtx.getGbKeyNameList().addAll(expr.getCols()); canApplyCtx.getSelectColumnsList().add(((ExprNodeColumnDesc) childExpr).getColumn()); }else if(childExpr instanceof ExprNodeGenericFuncDesc){ - checkExpression(childExpr); + checkExpression(canApplyCtx, childExpr); } } } } } - - public static CheckGroupByProc canApplyOnGroupByOperator() { - return new CheckGroupByProc(); + public static CheckGroupByProc canApplyOnGroupByOperator(TableScanOperator topOp) { + return new CheckGroupByProc(topOp); } - - - /** - * Check for conditions in SelectOperator that do not meet rewrite criteria. - */ - private static class CheckSelectProc implements NodeProcessor { - public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, - Object... nodeOutputs) throws SemanticException { - SelectOperator operator = (SelectOperator)nd; - canApplyCtx = (RewriteCanApplyCtx)ctx; - - List<Operator<? extends OperatorDesc>> childrenList = operator.getChildOperators(); - Operator<? extends OperatorDesc> child = childrenList.get(0); - if(child instanceof FileSinkOperator){ - Map<String, String> internalToAlias = new LinkedHashMap<String, String>(); - RowSchema rs = operator.getSchema(); - //to get the internal to alias mapping - List<ColumnInfo> sign = rs.getSignature(); - for (ColumnInfo columnInfo : sign) { - internalToAlias.put(columnInfo.getInternalName(), columnInfo.getAlias()); - } - - //if FilterOperator predicate has internal column names, - //we need to retrieve the 'actual' column names to - //check if index keys contain all filter predicate columns and vice-a-versa - Iterator<String> predItr = canApplyCtx.getPredicateColumnsList().iterator(); - while(predItr.hasNext()){ - String predCol = predItr.next(); - String newPredCol = ""; - if(internalToAlias.get(predCol) != null){ - newPredCol = internalToAlias.get(predCol); - canApplyCtx.getPredicateColumnsList().remove(predCol); - canApplyCtx.getPredicateColumnsList().add(newPredCol); - } - } - } - return null; - } - } - - public static CheckSelectProc canApplyOnSelectOperator() { - return new CheckSelectProc(); - } - } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java Wed Aug 13 02:28:54 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.AggregateIndexHandler; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -48,6 +49,7 @@ import org.apache.hadoop.hive.ql.parse.O import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.util.StringUtils; /** @@ -106,11 +108,6 @@ public class RewriteGBUsingIndex impleme private final Map<String, RewriteCanApplyCtx> tsOpToProcess = new LinkedHashMap<String, RewriteCanApplyCtx>(); - //Name of the current table on which rewrite is being performed - private String baseTableName = null; - //Name of the current index which is used for rewrite - private String indexTableName = null; - //Index Validation Variables private static final String IDX_BUCKET_COL = "_bucketname"; private static final String IDX_OFFSETS_ARRAY_COL = "_offsets"; @@ -133,7 +130,7 @@ public class RewriteGBUsingIndex impleme /* Check if the input query passes all the tests to be eligible for a rewrite * If yes, rewrite original query; else, return the current parseContext */ - if(shouldApplyOptimization()){ + if (shouldApplyOptimization()) { LOG.info("Rewriting Original Query using " + getName() + " optimization."); rewriteOriginalQuery(); } @@ -155,59 +152,52 @@ public class RewriteGBUsingIndex impleme * @return * @throws SemanticException */ - boolean shouldApplyOptimization() throws SemanticException{ - boolean canApply = false; - if(ifQueryHasMultipleTables()){ + boolean shouldApplyOptimization() throws SemanticException { + if (ifQueryHasMultipleTables()) { //We do not apply this optimization for this case as of now. return false; - }else{ + } + Map<Table, List<Index>> tableToIndex = getIndexesForRewrite(); + if (tableToIndex.isEmpty()) { + LOG.debug("No Valid Index Found to apply Rewrite, " + + "skipping " + getName() + " optimization"); + return false; + } /* * This code iterates over each TableScanOperator from the topOps map from ParseContext. * For each operator tree originating from this top TableScanOperator, we determine * if the optimization can be applied. If yes, we add the name of the top table to * the tsOpToProcess to apply rewrite later on. * */ - Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable(); - Iterator<TableScanOperator> topOpItr = topToTable.keySet().iterator(); - while(topOpItr.hasNext()){ - - TableScanOperator topOp = topOpItr.next(); - Table table = topToTable.get(topOp); - baseTableName = table.getTableName(); - Map<Table, List<Index>> indexes = getIndexesForRewrite(); - if(indexes == null){ - LOG.debug("Error getting valid indexes for rewrite, " + - "skipping " + getName() + " optimization"); - return false; - } + Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable(); + Map<String, Operator<?>> topOps = parseContext.getTopOps(); + + for (Map.Entry<String, Operator<?>> entry : parseContext.getTopOps().entrySet()) { - if(indexes.size() == 0){ - LOG.debug("No Valid Index Found to apply Rewrite, " + + String alias = entry.getKey(); + TableScanOperator topOp = (TableScanOperator) entry.getValue(); + + Table table = topToTable.get(topOp); + List<Index> indexes = tableToIndex.get(table); + if (indexes.isEmpty()) { + continue; + } + + if (table.isPartitioned()) { + //if base table has partitions, we need to check if index is built for + //all partitions. If not, then we do not apply the optimization + if (!checkIfIndexBuiltOnAllTablePartitions(topOp, indexes)) { + LOG.debug("Index is not built for all table partitions, " + "skipping " + getName() + " optimization"); - return false; - }else{ - //we need to check if the base table has confirmed or unknown partitions - if(parseContext.getOpToPartList() != null && parseContext.getOpToPartList().size() > 0){ - //if base table has partitions, we need to check if index is built for - //all partitions. If not, then we do not apply the optimization - if(checkIfIndexBuiltOnAllTablePartitions(topOp, indexes)){ - //check if rewrite can be applied for operator tree - //if partitions condition returns true - canApply = checkIfRewriteCanBeApplied(topOp, table, indexes); - }else{ - LOG.debug("Index is not built for all table partitions, " + - "skipping " + getName() + " optimization"); - return false; - } - }else{ - //check if rewrite can be applied for operator tree - //if there are no partitions on base table - canApply = checkIfRewriteCanBeApplied(topOp, table, indexes); - } + continue; } } + //check if rewrite can be applied for operator tree + //if there are no partitions on base table + checkIfRewriteCanBeApplied(alias, topOp, table, indexes); } - return canApply; + + return !tsOpToProcess.isEmpty(); } /** @@ -219,61 +209,36 @@ public class RewriteGBUsingIndex impleme * @return - true if rewrite can be applied on the current branch; false otherwise * @throws SemanticException */ - private boolean checkIfRewriteCanBeApplied(TableScanOperator topOp, Table baseTable, - Map<Table, List<Index>> indexes) throws SemanticException{ - boolean canApply = false; + private boolean checkIfRewriteCanBeApplied(String alias, TableScanOperator topOp, + Table baseTable, List<Index> indexes) throws SemanticException{ //Context for checking if this optimization can be applied to the input query RewriteCanApplyCtx canApplyCtx = RewriteCanApplyCtx.getInstance(parseContext); - Map<String, Operator<? extends OperatorDesc>> topOps = parseContext.getTopOps(); - canApplyCtx.setBaseTableName(baseTableName); + canApplyCtx.setAlias(alias); + canApplyCtx.setBaseTableName(baseTable.getTableName()); canApplyCtx.populateRewriteVars(topOp); - Map<Index, Set<String>> indexTableMap = getIndexToKeysMap(indexes.get(baseTable)); - Iterator<Index> indexMapItr = indexTableMap.keySet().iterator(); - Index index = null; - while(indexMapItr.hasNext()){ + Map<Index, Set<String>> indexTableMap = getIndexToKeysMap(indexes); + for (Map.Entry<Index, Set<String>> entry : indexTableMap.entrySet()) { //we rewrite the original query using the first valid index encountered //this can be changed if we have a better mechanism to //decide which index will produce a better rewrite - index = indexMapItr.next(); - canApply = canApplyCtx.isIndexUsableForQueryBranchRewrite(index, - indexTableMap.get(index)); - if(canApply){ - canApply = checkIfAllRewriteCriteriaIsMet(canApplyCtx); - //break here if any valid index is found to apply rewrite - if(canApply){ - //check if aggregation function is set. - //If not, set it using the only indexed column - if(canApplyCtx.getAggFunction() == null){ - //strip of the start and end braces [...] - String aggregationFunction = indexTableMap.get(index).toString(); - aggregationFunction = aggregationFunction.substring(1, - aggregationFunction.length() - 1); - canApplyCtx.setAggFunction("_count_of_" + aggregationFunction + ""); - } + Index index = entry.getKey(); + Set<String> indexKeyNames = entry.getValue(); + //break here if any valid index is found to apply rewrite + if (canApplyCtx.isIndexUsableForQueryBranchRewrite(index, indexKeyNames) && + checkIfAllRewriteCriteriaIsMet(canApplyCtx)) { + //check if aggregation function is set. + //If not, set it using the only indexed column + if (canApplyCtx.getAggFunction() == null) { + canApplyCtx.setAggFunction("_count_of_" + StringUtils.join(",", indexKeyNames) + ""); } - break; + canApplyCtx.setIndexTableName(index.getIndexTableName()); + tsOpToProcess.put(alias, canApplyCtx); + return true; } } - indexTableName = index.getIndexTableName(); - - if(canApply && topOps.containsValue(topOp)) { - Iterator<String> topOpNamesItr = topOps.keySet().iterator(); - while(topOpNamesItr.hasNext()){ - String topOpName = topOpNamesItr.next(); - if(topOps.get(topOpName).equals(topOp)){ - tsOpToProcess.put(topOpName, canApplyCtx); - } - } - } - - if(tsOpToProcess.size() == 0){ - canApply = false; - }else{ - canApply = true; - } - return canApply; + return false; } /** @@ -329,7 +294,7 @@ public class RewriteGBUsingIndex impleme * @throws SemanticException */ private boolean checkIfIndexBuiltOnAllTablePartitions(TableScanOperator tableScan, - Map<Table, List<Index>> indexes) throws SemanticException{ + List<Index> indexes) throws SemanticException { // check if we have indexes on all partitions in this table scan Set<Partition> queryPartitions; try { @@ -341,7 +306,7 @@ public class RewriteGBUsingIndex impleme LOG.error("Fatal Error: problem accessing metastore", e); throw new SemanticException(e); } - if(queryPartitions.size() != 0){ + if (queryPartitions.size() != 0) { return true; } return false; @@ -355,12 +320,11 @@ public class RewriteGBUsingIndex impleme * @throws SemanticException */ Map<Index, Set<String>> getIndexToKeysMap(List<Index> indexTables) throws SemanticException{ - Index index = null; Hive hiveInstance = hiveDb; Map<Index, Set<String>> indexToKeysMap = new LinkedHashMap<Index, Set<String>>(); for (int idxCtr = 0; idxCtr < indexTables.size(); idxCtr++) { final Set<String> indexKeyNames = new LinkedHashSet<String>(); - index = indexTables.get(idxCtr); + Index index = indexTables.get(idxCtr); //Getting index key columns StorageDescriptor sd = index.getSd(); List<FieldSchema> idxColList = sd.getCols(); @@ -373,8 +337,9 @@ public class RewriteGBUsingIndex impleme // index is changed. List<String> idxTblColNames = new ArrayList<String>(); try { - Table idxTbl = hiveInstance.getTable(index.getDbName(), + String[] qualified = Utilities.getDbTableName(index.getDbName(), index.getIndexTableName()); + Table idxTbl = hiveInstance.getTable(qualified[0], qualified[1]); for (FieldSchema idxTblCol : idxTbl.getCols()) { idxTblColNames.add(idxTblCol.getName()); } @@ -403,17 +368,17 @@ public class RewriteGBUsingIndex impleme */ @SuppressWarnings("unchecked") private void rewriteOriginalQuery() throws SemanticException { - Map<String, Operator<? extends OperatorDesc>> topOpMap = - (HashMap<String, Operator<? extends OperatorDesc>>) parseContext.getTopOps().clone(); + Map<String, Operator<?>> topOpMap = parseContext.getTopOps(); Iterator<String> tsOpItr = tsOpToProcess.keySet().iterator(); - while(tsOpItr.hasNext()){ - baseTableName = tsOpItr.next(); - RewriteCanApplyCtx canApplyCtx = tsOpToProcess.get(baseTableName); - TableScanOperator topOp = (TableScanOperator) topOpMap.get(baseTableName); + for (Map.Entry<String, RewriteCanApplyCtx> entry : tsOpToProcess.entrySet()) { + String alias = entry.getKey(); + RewriteCanApplyCtx canApplyCtx = entry.getValue(); + TableScanOperator topOp = (TableScanOperator) topOpMap.get(alias); RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb, - indexTableName, baseTableName, canApplyCtx.getAggFunction()); + canApplyCtx.getIndexTableName(), canApplyCtx.getAlias(), + canApplyCtx.getAllColumns(), canApplyCtx.getAggFunction()); rewriteQueryCtx.invokeRewriteQueryProc(topOp); parseContext = rewriteQueryCtx.getParseContext(); parseContext.setOpParseCtx((LinkedHashMap<Operator<? extends OperatorDesc>, Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java Wed Aug 13 02:28:54 2014 @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.Nod import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; @@ -68,7 +69,6 @@ import org.apache.hadoop.hive.serde2.typ */ public final class RewriteQueryUsingAggregateIndex { private static final Log LOG = LogFactory.getLog(RewriteQueryUsingAggregateIndex.class.getName()); - private static RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null; private RewriteQueryUsingAggregateIndex() { //this prevents the class from getting instantiated @@ -78,7 +78,7 @@ public final class RewriteQueryUsingAggr public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { SelectOperator operator = (SelectOperator)nd; - rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; List<Operator<? extends OperatorDesc>> childOps = operator.getChildOperators(); Operator<? extends OperatorDesc> childOp = childOps.iterator().next(); @@ -98,7 +98,7 @@ public final class RewriteQueryUsingAggr List<ColumnInfo> selRSSignature = selRS.getSignature(); //Need to create a new type for Column[_count_of_indexed_key_column] node - PrimitiveTypeInfo pti = (PrimitiveTypeInfo) TypeInfoFactory.getPrimitiveTypeInfo("bigint"); + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo("bigint"); pti.setTypeName("bigint"); ColumnInfo newCI = new ColumnInfo(rewriteQueryCtx.getAggregateFunction(), pti, "", false); selRSSignature.add(newCI); @@ -117,19 +117,15 @@ public final class RewriteQueryUsingAggr /** * This processor replaces the original TableScanOperator with * the new TableScanOperator and metadata that scans over the - * index table rather than scanning over the orginal table. + * index table rather than scanning over the original table. * */ private static class ReplaceTableScanOpProc implements NodeProcessor { public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { TableScanOperator scanOperator = (TableScanOperator)nd; - rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; - String baseTableName = rewriteQueryCtx.getBaseTableName(); - String alias = null; - if(baseTableName.contains(":")){ - alias = (baseTableName.split(":"))[0]; - } + RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + String alias = rewriteQueryCtx.getAlias(); //Need to remove the original TableScanOperators from these data structures // and add new ones @@ -144,8 +140,8 @@ public final class RewriteQueryUsingAggr OpParseContext operatorContext = opParseContext.get(scanOperator); //remove original TableScanOperator + topOps.remove(alias); topToTable.remove(scanOperator); - topOps.remove(baseTableName); opParseContext.remove(scanOperator); //construct a new descriptor for the index table scan @@ -171,13 +167,11 @@ public final class RewriteQueryUsingAggr try { StructObjectInspector rowObjectInspector = (StructObjectInspector) indexTableHandle.getDeserializer().getObjectInspector(); - List<? extends StructField> fields = rowObjectInspector - .getAllStructFieldRefs(); - for (int i = 0; i < fields.size(); i++) { - rr.put(indexTableName, fields.get(i).getFieldName(), new ColumnInfo(fields - .get(i).getFieldName(), TypeInfoUtils - .getTypeInfoFromObjectInspector(fields.get(i) - .getFieldObjectInspector()), indexTableName, false)); + for (String column : rewriteQueryCtx.getColumns()) { + StructField field = rowObjectInspector.getStructFieldRef(column); + rr.put(indexTableName, field.getFieldName(), new ColumnInfo(field.getFieldName(), + TypeInfoUtils.getTypeInfoFromObjectInspector(field.getFieldObjectInspector()), + indexTableName, false)); } } catch (SerDeException e) { LOG.error("Error while creating the RowResolver for new TableScanOperator."); @@ -187,18 +181,18 @@ public final class RewriteQueryUsingAggr //Set row resolver for new table operatorContext.setRowResolver(rr); - String tabNameWithAlias = null; - if(alias != null){ - tabNameWithAlias = alias + ":" + indexTableName; - }else{ - tabNameWithAlias = indexTableName; - } + + String newAlias = indexTableName; + int index = alias.lastIndexOf(":"); + if (index >= 0) { + newAlias = alias.substring(0, index) + ":" + indexTableName; + } //Scan operator now points to other table topToTable.put(scanOperator, indexTableHandle); - scanOperator.getConf().setAlias(tabNameWithAlias); + scanOperator.getConf().setAlias(newAlias); scanOperator.setAlias(indexTableName); - topOps.put(tabNameWithAlias, scanOperator); + topOps.put(newAlias, scanOperator); opParseContext.put(scanOperator, operatorContext); rewriteQueryCtx.getParseContext().setTopToTable( (HashMap<TableScanOperator, Table>) topToTable); @@ -207,6 +201,9 @@ public final class RewriteQueryUsingAggr rewriteQueryCtx.getParseContext().setOpParseCtx( (LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>) opParseContext); + ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rr, + new ArrayList<String>(rewriteQueryCtx.getColumns())); + return null; } } @@ -228,7 +225,7 @@ public final class RewriteQueryUsingAggr public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { GroupByOperator operator = (GroupByOperator)nd; - rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; + RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx; //We need to replace the GroupByOperator which is in //groupOpToInputTables map with the new GroupByOperator Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java Wed Aug 13 02:28:54 2014 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Stack; import org.apache.hadoop.hive.ql.exec.Operator; @@ -54,19 +55,21 @@ import org.apache.hadoop.hive.ql.udf.gen public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorCtx { private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveDb, - String indexTableName, String baseTableName, String aggregateFunction){ + String indexTableName, String alias, Set<String> columns, String aggregateFunction) { this.parseContext = parseContext; this.hiveDb = hiveDb; this.indexTableName = indexTableName; - this.baseTableName = baseTableName; + this.alias = alias; this.aggregateFunction = aggregateFunction; + this.columns = columns; this.opc = parseContext.getOpParseCtx(); } public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseContext, - Hive hiveDb, String indexTableName, String baseTableName, String aggregateFunction){ + Hive hiveDb, String indexTableName, String alias, + Set<String> columns, String aggregateFunction) { return new RewriteQueryUsingAggregateIndexCtx( - parseContext, hiveDb, indexTableName, baseTableName, aggregateFunction); + parseContext, hiveDb, indexTableName, alias, columns, aggregateFunction); } @@ -77,8 +80,9 @@ public final class RewriteQueryUsingAggr //We need the GenericUDAFEvaluator for GenericUDAF function "sum" private GenericUDAFEvaluator eval = null; private final String indexTableName; - private final String baseTableName; + private final String alias; private final String aggregateFunction; + private final Set<String> columns; private ExprNodeColumnDesc aggrExprNode = null; public Map<Operator<? extends OperatorDesc>, OpParseContext> getOpc() { @@ -161,11 +165,15 @@ public final class RewriteQueryUsingAggr }; } - public String getBaseTableName() { - return baseTableName; + public String getAlias() { + return alias; } public String getAggregateFunction() { return aggregateFunction; } + + public Set<String> getColumns() { + return columns; + } } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java Wed Aug 13 02:28:54 2014 @@ -86,7 +86,7 @@ public class NullScanOptimizer implement return null; } ExprNodeConstantDesc c = (ExprNodeConstantDesc) condition; - if (c.getValue() != Boolean.FALSE) { + if (!Boolean.FALSE.equals(c.getValue())) { return null; } Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1617652&r1=1617651&r2=1617652&view=diff ============================================================================== --- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original) +++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Wed Aug 13 02:28:54 2014 @@ -143,11 +143,13 @@ public class Vectorizer implements Physi patternBuilder.append("|short"); patternBuilder.append("|timestamp"); patternBuilder.append("|boolean"); + patternBuilder.append("|binary"); patternBuilder.append("|string"); patternBuilder.append("|byte"); patternBuilder.append("|float"); patternBuilder.append("|double"); patternBuilder.append("|date"); + patternBuilder.append("|void"); // Decimal types can be specified with different precision and scales e.g. decimal(10,5), // as opposed to other data types which can be represented by constant strings.