Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Tue Sep 2 19:56:56 2014 @@ -45,61 +45,60 @@ import org.apache.hadoop.hive.serde2.obj public class PTFOperator extends Operator<PTFDesc> implements Serializable { - private static final long serialVersionUID = 1L; - boolean isMapOperator; + private static final long serialVersionUID = 1L; + boolean isMapOperator; - transient KeyWrapperFactory keyWrapperFactory; - protected transient KeyWrapper currentKeys; - protected transient KeyWrapper newKeys; - /* - * for map-side invocation of PTFs, we cannot utilize the currentkeys null check - * to decide on invoking startPartition in streaming mode. Hence this extra flag. - */ - transient boolean firstMapRow; - transient Configuration hiveConf; - transient PTFInvocation ptfInvocation; - - /* - * 1. Find out if the operator is invoked at Map-Side or Reduce-side - * 2. Get the deserialized QueryDef - * 3. Reconstruct the transient variables in QueryDef - * 4. Create input partition to store rows coming from previous operator - */ - @Override - protected void initializeOp(Configuration jobConf) throws HiveException { - hiveConf = jobConf; - // if the parent is ExtractOperator, this invocation is from reduce-side - isMapOperator = conf.isMapSide(); - - reconstructQueryDef(hiveConf); - - if (isMapOperator) { - PartitionedTableFunctionDef tDef = conf.getStartOfChain(); - outputObjInspector = tDef.getRawInputShape().getOI(); - } else { - outputObjInspector = conf.getFuncDef().getOutputShape().getOI(); - } - - setupKeysWrapper(inputObjInspectors[0]); - - ptfInvocation = setupChain(); - ptfInvocation.initializeStreaming(jobConf, isMapOperator); - firstMapRow = true; - - super.initializeOp(jobConf); - } - - @Override - protected void closeOp(boolean abort) throws HiveException { - super.closeOp(abort); + transient KeyWrapperFactory keyWrapperFactory; + protected transient KeyWrapper currentKeys; + protected transient KeyWrapper newKeys; + /* + * for map-side invocation of PTFs, we cannot utilize the currentkeys null check + * to decide on invoking startPartition in streaming mode. Hence this extra flag. + */ + transient boolean firstMapRow; + transient Configuration hiveConf; + transient PTFInvocation ptfInvocation; + + /* + * 1. Find out if the operator is invoked at Map-Side or Reduce-side + * 2. Get the deserialized QueryDef + * 3. Reconstruct the transient variables in QueryDef + * 4. Create input partition to store rows coming from previous operator + */ + @Override + protected void initializeOp(Configuration jobConf) throws HiveException { + hiveConf = jobConf; + // if the parent is ExtractOperator, this invocation is from reduce-side + isMapOperator = conf.isMapSide(); + + reconstructQueryDef(hiveConf); + + if (isMapOperator) { + PartitionedTableFunctionDef tDef = conf.getStartOfChain(); + outputObjInspector = tDef.getRawInputShape().getOI(); + } else { + outputObjInspector = conf.getFuncDef().getOutputShape().getOI(); + } + + setupKeysWrapper(inputObjInspectors[0]); + + ptfInvocation = setupChain(); + ptfInvocation.initializeStreaming(jobConf, isMapOperator); + firstMapRow = true; + + super.initializeOp(jobConf); + } + + @Override + protected void closeOp(boolean abort) throws HiveException { + super.closeOp(abort); ptfInvocation.finishPartition(); ptfInvocation.close(); } - @Override - public void processOp(Object row, int tag) throws HiveException - { - if (!isMapOperator ) { + @Override + public void processOp(Object row, int tag) throws HiveException { + if (!isMapOperator ) { /* * checkif current row belongs to the current accumulated Partition: * - If not: @@ -129,51 +128,51 @@ public class PTFOperator extends Operato } ptfInvocation.processRow(row); - } + } + + /** + * Initialize the visitor to use the QueryDefDeserializer Use the order + * defined in QueryDefWalker to visit the QueryDef + * + * @param hiveConf + * @throws HiveException + */ + protected void reconstructQueryDef(Configuration hiveConf) throws HiveException { + + PTFDeserializer dS = + new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf); + dS.initializePTFChain(conf.getFuncDef()); + } + + protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException { + PartitionDef pDef = conf.getStartOfChain().getPartition(); + List<PTFExpressionDef> exprs = pDef.getExpressions(); + int numExprs = exprs.size(); + ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs]; + ObjectInspector[] keyOIs = new ObjectInspector[numExprs]; + ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs]; + + for(int i=0; i<numExprs; i++) { + PTFExpressionDef exprDef = exprs.get(i); + /* + * Why cannot we just use the ExprNodeEvaluator on the column? + * - because on the reduce-side it is initialized based on the rowOI of the HiveTable + * and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side) + */ + keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode()); + keyOIs[i] = keyFields[i].initialize(inputOI); + currentKeyOIs[i] = + ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i], + ObjectInspectorCopyOption.WRITABLE); + } + + keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs); + newKeys = keyWrapperFactory.getKeyWrapper(); + } - /** - * Initialize the visitor to use the QueryDefDeserializer Use the order - * defined in QueryDefWalker to visit the QueryDef - * - * @param hiveConf - * @throws HiveException - */ - protected void reconstructQueryDef(Configuration hiveConf) throws HiveException { - - PTFDeserializer dS = - new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf); - dS.initializePTFChain(conf.getFuncDef()); - } - - protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException { - PartitionDef pDef = conf.getStartOfChain().getPartition(); - List<PTFExpressionDef> exprs = pDef.getExpressions(); - int numExprs = exprs.size(); - ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs]; - ObjectInspector[] keyOIs = new ObjectInspector[numExprs]; - ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs]; - - for(int i=0; i<numExprs; i++) { - PTFExpressionDef exprDef = exprs.get(i); - /* - * Why cannot we just use the ExprNodeEvaluator on the column? - * - because on the reduce-side it is initialized based on the rowOI of the HiveTable - * and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side) - */ - keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode()); - keyOIs[i] = keyFields[i].initialize(inputOI); - currentKeyOIs[i] = - ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i], - ObjectInspectorCopyOption.WRITABLE); - } - - keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs); - newKeys = keyWrapperFactory.getKeyWrapper(); - } - - /** - * @return the name of the operator - */ + /** + * @return the name of the operator + */ @Override public String getName() { return getOperatorName(); @@ -184,11 +183,11 @@ public class PTFOperator extends Operato } - @Override - public OperatorType getType() { - return OperatorType.PTF; - } - + @Override + public OperatorType getType() { + return OperatorType.PTF; + } + private PTFInvocation setupChain() { Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>(); PTFInputDef iDef = conf.getFuncDef(); @@ -197,9 +196,9 @@ public class PTFOperator extends Operato fnDefs.push((PartitionedTableFunctionDef) iDef); iDef = ((PartitionedTableFunctionDef) iDef).getInput(); } - + PTFInvocation curr = null, first = null; - + while(!fnDefs.isEmpty()) { PartitionedTableFunctionDef currFn = fnDefs.pop(); curr = new PTFInvocation(curr, currFn.getTFunction()); @@ -222,26 +221,26 @@ public class PTFOperator extends Operato llFn.setpItr(pItr); } } - + /* * Responsible for the flow of rows through the PTF Chain. - * An Invocation wraps a TableFunction. - * The PTFOp hands the chain each row through the processRow call. + * An Invocation wraps a TableFunction. + * The PTFOp hands the chain each row through the processRow call. * It also notifies the chain of when a Partition starts/finishes. - * + * * There are several combinations depending * whether the TableFunction and its successor support Streaming or Batch mode. - * + * * Combination 1: Streaming + Streaming * - Start Partition: invoke startPartition on tabFn. - * - Process Row: invoke process Row on tabFn. + * - Process Row: invoke process Row on tabFn. * Any output rows hand to next tabFn in chain or forward to next Operator. * - Finish Partition: invoke finishPartition on tabFn. * Any output rows hand to next tabFn in chain or forward to next Operator. - * + * * Combination 2: Streaming + Batch * same as Combination 1 - * + * * Combination 3: Batch + Batch * - Start Partition: create or reset the Input Partition for the tabFn * caveat is: if prev is also batch and it is not providing an Output Iterator @@ -251,22 +250,22 @@ public class PTFOperator extends Operato * If function gives an Output Partition: set it on next Invocation's Input Partition * If function gives an Output Iterator: iterate and call processRow on next Invocation. * For last Invocation in chain: forward rows to next Operator. - * + * * Combination 3: Batch + Stream * Similar to Combination 3, except Finish Partition behavior slightly different * - Finish Partition : invoke evaluate on tabFn on Input Partition * iterate output rows: hand to next tabFn in chain or forward to next Operator. - * + * */ class PTFInvocation { - + PTFInvocation prev; PTFInvocation next; TableFunctionEvaluator tabFn; PTFPartition inputPart; PTFPartition outputPart; Iterator<Object> outputPartRowsItr; - + public PTFInvocation(PTFInvocation prev, TableFunctionEvaluator tabFn) { this.prev = prev; this.tabFn = tabFn; @@ -274,19 +273,19 @@ public class PTFOperator extends Operato prev.next = this; } } - + boolean isOutputIterator() { return tabFn.canAcceptInputAsStream() || tabFn.canIterateOutput(); } - + boolean isStreaming() { return tabFn.canAcceptInputAsStream(); } - + void initializeStreaming(Configuration cfg, boolean isMapSide) throws HiveException { PartitionedTableFunctionDef tabDef = tabFn.getTableDef(); PTFInputDef inputDef = tabDef.getInput(); - ObjectInspector inputOI = conf.getStartOfChain() == tabDef ? + ObjectInspector inputOI = conf.getStartOfChain() == tabDef ? inputObjInspectors[0] : inputDef.getOutputShape().getOI(); tabFn.initializeStreaming(cfg, (StructObjectInspector) inputOI, isMapSide); @@ -295,7 +294,7 @@ public class PTFOperator extends Operato next.initializeStreaming(cfg, isMapSide); } } - + void startPartition() throws HiveException { if ( isStreaming() ) { tabFn.startPartition(); @@ -312,7 +311,7 @@ public class PTFOperator extends Operato next.startPartition(); } } - + void processRow(Object row) throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.processRow(row)); @@ -320,7 +319,7 @@ public class PTFOperator extends Operato inputPart.append(row); } } - + void handleOutputRows(List<Object> outRows) throws HiveException { if ( outRows != null ) { for (Object orow : outRows ) { @@ -332,7 +331,7 @@ public class PTFOperator extends Operato } } } - + void finishPartition() throws HiveException { if ( isStreaming() ) { handleOutputRows(tabFn.finishPartition()); @@ -353,7 +352,7 @@ public class PTFOperator extends Operato } } } - + if ( next != null ) { next.finishPartition(); } else { @@ -364,7 +363,7 @@ public class PTFOperator extends Operato } } } - + /** * Create a new Partition. * A partition has 2 OIs: the OI for the rows being put in and the OI for the rows @@ -388,7 +387,7 @@ public class PTFOperator extends Operato private void createInputPartition() throws HiveException { PartitionedTableFunctionDef tabDef = tabFn.getTableDef(); PTFInputDef inputDef = tabDef.getInput(); - ObjectInspector inputOI = conf.getStartOfChain() == tabDef ? + ObjectInspector inputOI = conf.getStartOfChain() == tabDef ? inputObjInspectors[0] : inputDef.getOutputShape().getOI(); SerDe serde = conf.isMapSide() ? tabDef.getInput().getOutputShape().getSerde() : @@ -400,7 +399,7 @@ public class PTFOperator extends Operato (StructObjectInspector) inputOI, outputOI); } - + void close() { if ( inputPart != null ) { inputPart.close(); @@ -411,5 +410,5 @@ public class PTFOperator extends Operato } } } - + }
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Tue Sep 2 19:56:56 2014 @@ -27,6 +27,8 @@ import java.util.Comparator; import java.util.List; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +47,8 @@ import org.apache.hadoop.mapred.OutputCo public class PartitionKeySampler implements OutputCollector<HiveKey, Object> { + private static final Log LOG = LogFactory.getLog(PartitionKeySampler.class); + public static final Comparator<byte[]> C = new Comparator<byte[]>() { public final int compare(byte[] o1, byte[] o2) { return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length); @@ -74,32 +78,46 @@ public class PartitionKeySampler impleme } // sort and pick partition keys - // copied from org.apache.hadoop.mapred.lib.InputSampler + // originally copied from org.apache.hadoop.mapred.lib.InputSampler but seemed to have a bug private byte[][] getPartitionKeys(int numReduce) { if (sampled.size() < numReduce - 1) { throw new IllegalStateException("not enough number of sample"); } byte[][] sorted = sampled.toArray(new byte[sampled.size()][]); Arrays.sort(sorted, C); - byte[][] partitionKeys = new byte[numReduce - 1][]; - float stepSize = sorted.length / (float) numReduce; - int last = -1; - for(int i = 1; i < numReduce; ++i) { - int k = Math.round(stepSize * i); - while (last >= k && C.compare(sorted[last], sorted[k]) == 0) { - k++; + + return toPartitionKeys(sorted, numReduce); + } + + static final byte[][] toPartitionKeys(byte[][] sorted, int numPartition) { + byte[][] partitionKeys = new byte[numPartition - 1][]; + + int last = 0; + int current = 0; + for(int i = 0; i < numPartition - 1; i++) { + current += Math.round((float)(sorted.length - current) / (numPartition - i)); + while (i > 0 && current < sorted.length && C.compare(sorted[last], sorted[current]) == 0) { + current++; + } + if (current >= sorted.length) { + return Arrays.copyOfRange(partitionKeys, 0, i); } - if (k >= sorted.length) { - throw new IllegalStateException("not enough number of sample"); + if (LOG.isDebugEnabled()) { + // print out nth partition key for debugging + LOG.debug("Partition key " + current + "th :" + new BytesWritable(sorted[current])); } - partitionKeys[i - 1] = sorted[k]; - last = k; + partitionKeys[i] = sorted[current]; + last = current; } return partitionKeys; } - public void writePartitionKeys(Path path, JobConf job) throws IOException { + public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOException { byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks()); + int numPartition = partitionKeys.length + 1; + if (numPartition != job.getNumReduceTasks()) { + job.setNumReduceTasks(numPartition); + } FileSystem fs = path.getFileSystem(job); SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path, Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java Tue Sep 2 19:56:56 2014 @@ -27,14 +27,13 @@ import java.lang.annotation.Target; import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction; @Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.TYPE}) +@Target(ElementType.TYPE) @Documented -public @interface PartitionTableFunctionDescription -{ - Description description (); +public @interface PartitionTableFunctionDescription { + Description description (); - /** - * if true it is not usable in the language. {@link WindowingTableFunction} is the only internal function. - */ - boolean isInternal() default false; + /** + * if true it is not usable in the language. {@link WindowingTableFunction} is the only internal function. + */ + boolean isInternal() default false; } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Tue Sep 2 19:56:56 2014 @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -157,12 +158,12 @@ public class ScriptOperator extends Oper } /** - * Maps a relative pathname to an absolute pathname using the PATH enviroment. + * Maps a relative pathname to an absolute pathname using the PATH environment. */ public class PathFinder { String pathenv; // a string of pathnames - String pathSep; // the path seperator - String fileSep; // the file seperator in a directory + String pathSep; // the path separator + String fileSep; // the file separator in a directory /** * Construct a PathFinder object using the path from the specified system @@ -284,7 +285,7 @@ public class ScriptOperator extends Oper @Override public void processOp(Object row, int tag) throws HiveException { - // initialize the user's process only when you recieve the first row + // initialize the user's process only when you receive the first row if (firstRow) { firstRow = false; try { @@ -358,7 +359,8 @@ public class ScriptOperator extends Oper .getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) { autoProgressor = new AutoProgressor(this.getClass().getName(), reporter, Utilities.getDefaultNotificationInterval(hconf), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000); + HiveConf.getTimeVar( + hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS)); autoProgressor.go(); } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Tue Sep 2 19:56:56 2014 @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,7 +32,6 @@ import org.apache.hadoop.hive.ql.plan.UD import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -86,7 +86,8 @@ public class UDTFOperator extends Operat if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) { autoProgressor = new AutoProgressor(this.getClass().getName(), reporter, Utilities.getDefaultNotificationInterval(hconf), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000); + HiveConf.getTimeVar( + hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS)); autoProgressor.go(); } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Sep 2 19:56:56 2014 @@ -111,6 +111,7 @@ import org.apache.hadoop.hive.ql.exec.mr import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -1362,8 +1363,8 @@ public final class Utilities { codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc); } - return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec, - progressable)); + return SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec, + progressable); } @@ -1538,6 +1539,24 @@ public final class Utilities { Pattern.compile("^.*?([0-9]+)(_[0-9]{1,6})?(\\..*)?$"); /** + * Some jobs like "INSERT INTO" jobs create copies of files like 0000001_0_copy_2. + * For such files, + * Group 1: 00000001 [taskId] + * Group 3: 0 [task attempId] + * Group 4: _copy_2 [copy suffix] + * Group 6: copy [copy keyword] + * Group 8: 2 [copy file index] + */ + private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX = + Pattern.compile("^.*?"+ // any prefix + "([0-9]+)"+ // taskId + "(_)"+ // separator + "([0-9]{1,6})?"+ // attemptId (limited to 6 digits) + "((_)(\\Bcopy\\B)(_)"+ // copy keyword + "([0-9]{1,6})$)?"+ // copy file index + "(\\..*)?$"); // any suffix/file extension + + /** * This retruns prefix part + taskID for bucket join for partitioned table */ private static final Pattern FILE_NAME_PREFIXED_TASK_ID_REGEX = @@ -1862,21 +1881,42 @@ public final class Utilities { // speculative runs), but the largest should be the correct one since the result // of a successful run should never be smaller than a failed/speculative run. FileStatus toDelete = null; - if (otherFile.getLen() >= one.getLen()) { - toDelete = one; - } else { - toDelete = otherFile; - taskIdToFile.put(taskId, one); - } - long len1 = toDelete.getLen(); - long len2 = taskIdToFile.get(taskId).getLen(); - if (!fs.delete(toDelete.getPath(), true)) { - throw new IOException("Unable to delete duplicate file: " + toDelete.getPath() - + ". Existing file: " + taskIdToFile.get(taskId).getPath()); + + // "LOAD .. INTO" and "INSERT INTO" commands will generate files with + // "_copy_x" suffix. These files are usually read by map tasks and the + // task output gets written to some tmp path. The output file names will + // be of format taskId_attemptId. The usual path for all these tasks is + // srcPath -> taskTmpPath -> tmpPath -> finalPath. + // But, MergeFileTask can move files directly from src path to final path + // without copying it to tmp path. In such cases, different files with + // "_copy_x" suffix will be identified as duplicates (change in value + // of x is wrongly identified as attempt id) and will be deleted. + // To avoid that we will ignore files with "_copy_x" suffix from duplicate + // elimination. + if (!isCopyFile(one.getPath().getName())) { + if (otherFile.getLen() >= one.getLen()) { + toDelete = one; + } else { + toDelete = otherFile; + taskIdToFile.put(taskId, one); + } + long len1 = toDelete.getLen(); + long len2 = taskIdToFile.get(taskId).getLen(); + if (!fs.delete(toDelete.getPath(), true)) { + throw new IOException( + "Unable to delete duplicate file: " + toDelete.getPath() + + ". Existing file: " + + taskIdToFile.get(taskId).getPath()); + } else { + LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + + " with length " + + len1 + ". Existing file: " + + taskIdToFile.get(taskId).getPath() + " with length " + + len2); + } } else { - LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " - + len1 + ". Existing file: " + taskIdToFile.get(taskId).getPath() + " with length " - + len2); + LOG.info(one.getPath() + " file identified as duplicate. This file is" + + " not deleted as it has copySuffix."); } } } @@ -1884,6 +1924,29 @@ public final class Utilities { return taskIdToFile; } + public static boolean isCopyFile(String filename) { + String taskId = filename; + String copyFileSuffix = null; + int dirEnd = filename.lastIndexOf(Path.SEPARATOR); + if (dirEnd != -1) { + taskId = filename.substring(dirEnd + 1); + } + Matcher m = COPY_FILE_NAME_TO_TASK_ID_REGEX.matcher(taskId); + if (!m.matches()) { + LOG.warn("Unable to verify if file name " + filename + " has _copy_ suffix."); + } else { + taskId = m.group(1); + copyFileSuffix = m.group(4); + } + + LOG.debug("Filename: " + filename + " TaskId: " + taskId + " CopySuffix: " + copyFileSuffix); + if (taskId != null && copyFileSuffix != null) { + return true; + } + + return false; + } + public static String getNameMessage(Exception e) { return e.getClass().getName() + "(" + e.getMessage() + ")"; } @@ -2680,7 +2743,7 @@ public final class Utilities { * first time it is caught, or SQLTransientException when the maxRetries has reached. */ public static <T> T executeWithRetry(SQLCommand<T> cmd, PreparedStatement stmt, - int baseWindow, int maxRetries) throws SQLException { + long baseWindow, int maxRetries) throws SQLException { Random r = new Random(); T result = null; @@ -2722,7 +2785,7 @@ public final class Utilities { * first time it is caught, or SQLTransientException when the maxRetries has reached. */ public static Connection connectWithRetry(String connectionString, - int waitWindow, int maxRetries) throws SQLException { + long waitWindow, int maxRetries) throws SQLException { Random r = new Random(); @@ -2764,7 +2827,7 @@ public final class Utilities { * first time it is caught, or SQLTransientException when the maxRetries has reached. */ public static PreparedStatement prepareWithRetry(Connection conn, String stmt, - int waitWindow, int maxRetries) throws SQLException { + long waitWindow, int maxRetries) throws SQLException { Random r = new Random(); @@ -2804,7 +2867,7 @@ public final class Utilities { * @param r a random generator. * @return number of milliseconds for the next wait time. */ - public static long getRandomWaitTime(int baseWindow, int failures, Random r) { + public static long getRandomWaitTime(long baseWindow, int failures, Random r) { return (long) ( baseWindow * failures + // grace period for the last round of attempt baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure @@ -3012,7 +3075,7 @@ public final class Utilities { * so we don't want to depend on scratch dir and context. */ public static List<Path> getInputPathsTez(JobConf job, MapWork work) throws Exception { - String scratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR); + String scratchDir = job.get(DagUtils.TEZ_TMP_DIR_KEY); // we usually don't want to create dummy files for tez, however the metadata only // optimization relies on it. @@ -3313,7 +3376,7 @@ public final class Utilities { /** * Returns true if a plan is both configured for vectorized execution * and vectorization is allowed. The plan may be configured for vectorization - * but vectorization dissalowed eg. for FetchOperator execution. + * but vectorization disallowed eg. for FetchOperator execution. */ public static boolean isVectorMode(Configuration conf) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && @@ -3462,13 +3525,13 @@ public final class Utilities { return createDirsWithPermission(conf, mkdir, fsPermission, recursive); } - private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, + private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, String origUmask, FileSystem fs) throws IOException { if (unsetUmask) { if (origUmask != null) { - conf.set("fs.permissions.umask-mode", origUmask); + conf.set(FsPermission.UMASK_LABEL, origUmask); } else { - conf.unset("fs.permissions.umask-mode"); + conf.unset(FsPermission.UMASK_LABEL); } } @@ -3482,10 +3545,10 @@ public final class Utilities { recursive); if (recursive) { - origUmask = conf.get("fs.permissions.umask-mode"); + origUmask = conf.get(FsPermission.UMASK_LABEL); // this umask is required because by default the hdfs mask is 022 resulting in // all parents getting the fsPermission & !(022) permission instead of fsPermission - conf.set("fs.permissions.umask-mode", "000"); + conf.set(FsPermission.UMASK_LABEL, "000"); } FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java Tue Sep 2 19:56:56 2014 @@ -28,39 +28,38 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction; @Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.TYPE}) +@Target(ElementType.TYPE) @Documented -public @interface WindowFunctionDescription -{ - Description description (); - /** - * controls whether this function can be applied to a Window. - * <p> - * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows. - * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on. - * For ranking defining a set of rows for every row makes no sense. - * <p> - * All other UDAFs can be computed for a Window. - */ - boolean supportsWindow() default true; - /** - * A WindowFunc is implemented as {@link GenericUDAFResolver2}. It returns only one value. - * If this is true then the function must return a List which is taken to be the column for this function in the Output table returned by the - * {@link WindowingTableFunction}. Otherwise the output is assumed to be a single value, the column of the Output will contain the same value - * for all the rows. - */ - boolean pivotResult() default false; +public @interface WindowFunctionDescription { + Description description (); + /** + * controls whether this function can be applied to a Window. + * <p> + * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows. + * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on. + * For ranking defining a set of rows for every row makes no sense. + * <p> + * All other UDAFs can be computed for a Window. + */ + boolean supportsWindow() default true; + /** + * A WindowFunc is implemented as {@link GenericUDAFResolver2}. It returns only one value. + * If this is true then the function must return a List which is taken to be the column for this function in the Output table returned by the + * {@link WindowingTableFunction}. Otherwise the output is assumed to be a single value, the column of the Output will contain the same value + * for all the rows. + */ + boolean pivotResult() default false; - /** - * Used in translations process to validate arguments - * @return true if ranking function - */ - boolean rankingFunction() default false; + /** + * Used in translations process to validate arguments + * @return true if ranking function + */ + boolean rankingFunction() default false; - /** - * Using in analytical functions to specify that UDF implies an ordering - * @return true if the function implies order - */ - boolean impliesOrder() default false; + /** + * Using in analytical functions to specify that UDF implies an ordering + * @return true if the function implies order + */ + boolean impliesOrder() default false; } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Tue Sep 2 19:56:56 2014 @@ -22,45 +22,39 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hive.common.util.AnnotationUtils; @SuppressWarnings("deprecation") -public class WindowFunctionInfo implements CommonFunctionInfo -{ - boolean supportsWindow = true; - boolean pivotResult = false; - boolean impliesOrder = false; - FunctionInfo fInfo; - - WindowFunctionInfo(FunctionInfo fInfo) - { - assert fInfo.isGenericUDAF(); - this.fInfo = fInfo; - Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass(); - WindowFunctionDescription def = +public class WindowFunctionInfo implements CommonFunctionInfo { + boolean supportsWindow = true; + boolean pivotResult = false; + boolean impliesOrder = false; + FunctionInfo fInfo; + + WindowFunctionInfo(FunctionInfo fInfo) { + assert fInfo.isGenericUDAF(); + this.fInfo = fInfo; + Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass(); + WindowFunctionDescription def = AnnotationUtils.getAnnotation(wfnCls, WindowFunctionDescription.class); - if ( def != null) - { - supportsWindow = def.supportsWindow(); - pivotResult = def.pivotResult(); - impliesOrder = def.impliesOrder(); - } - } - - public boolean isSupportsWindow() - { - return supportsWindow; - } - - public boolean isPivotResult() - { - return pivotResult; - } - - public boolean isImpliesOrder(){ - return impliesOrder; - } - public FunctionInfo getfInfo() - { - return fInfo; - } + if ( def != null) { + supportsWindow = def.supportsWindow(); + pivotResult = def.pivotResult(); + impliesOrder = def.impliesOrder(); + } + } + + public boolean isSupportsWindow() { + return supportsWindow; + } + + public boolean isPivotResult() { + return pivotResult; + } + + public boolean isImpliesOrder() { + return impliesOrder; + } + public FunctionInfo getfInfo() { + return fInfo; + } @Override public Class<?> getFunctionClass() { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Tue Sep 2 19:56:56 2014 @@ -63,7 +63,7 @@ public class MapJoinMemoryExhaustionHand if(maxHeapSize == -1) { this.maxHeapSize = 200L * 1024L * 1024L; LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " + - "defaulting maxHeapSize to 200MB"); + "defaulting maxHeapSize to 200MB"); } else { this.maxHeapSize = maxHeapSize; } @@ -91,4 +91,4 @@ public class MapJoinMemoryExhaustionHand throw new MapJoinMemoryExhaustionException(msg); } } -} \ No newline at end of file +} Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Sep 2 19:56:56 2014 @@ -371,7 +371,7 @@ public class ExecDriver extends Task<Map Utilities.setMapRedWork(job, work, ctx.getMRTmpPath()); - if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) { + if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) { try { handleSampling(driverContext, mWork, job, conf); job.setPartitionerClass(HiveTotalOrderPartitioner.class); @@ -539,7 +539,7 @@ public class ExecDriver extends Task<Map } else { throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType()); } - sampler.writePartitionKeys(partitionFile, job); + sampler.writePartitionKeys(partitionFile, conf, job); } /** Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Tue Sep 2 19:56:56 2014 @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -219,8 +220,8 @@ public class HadoopJobExecHelper { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); //DecimalFormat longFormatter = new DecimalFormat("###,###"); long reportTime = System.currentTimeMillis(); - long maxReportInterval = - HiveConf.getLongVar(job, HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL); + long maxReportInterval = HiveConf.getTimeVar( + job, HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS); boolean fatal = false; StringBuilder errMsg = new StringBuilder(); long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Tue Sep 2 19:56:56 2014 @@ -32,7 +32,7 @@ import org.apache.hadoop.io.Writable; @SuppressWarnings("deprecation") public class MapJoinTableContainerSerDe { - + private final MapJoinObjectSerDeContext keyContext; private final MapJoinObjectSerDeContext valueContext; public MapJoinTableContainerSerDe(MapJoinObjectSerDeContext keyContext, @@ -70,7 +70,7 @@ public class MapJoinTableContainerSerDe } try { Writable keyContainer = keySerDe.getSerializedClass().newInstance(); - Writable valueContainer = valueSerDe.getSerializedClass().newInstance(); + Writable valueContainer = valueSerDe.getSerializedClass().newInstance(); int numKeys = in.readInt(); for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { MapJoinKeyObject key = new MapJoinKeyObject(); @@ -89,7 +89,7 @@ public class MapJoinTableContainerSerDe public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer) throws HiveException { int numKeys = tableContainer.size(); - try { + try { out.writeUTF(tableContainer.getClass().getName()); out.writeObject(tableContainer.getMetaData()); out.writeInt(numKeys); @@ -108,7 +108,7 @@ public class MapJoinTableContainerSerDe throw new ConcurrentModificationException("TableContainer was modified while persisting: " + tableContainer); } } - + public static void persistDummyTable(ObjectOutputStream out) throws IOException { MapJoinPersistableTableContainer tableContainer = new HashMapWrapper(); out.writeUTF(tableContainer.getClass().getName()); @@ -127,8 +127,8 @@ public class MapJoinTableContainerSerDe return constructor.newInstance(metaData); } catch (Exception e) { String msg = "Error while attemping to create table container" + - " of type: " + name + ", with metaData: " + metaData; + " of type: " + name + ", with metaData: " + metaData; throw new HiveException(msg, e); } } -} \ No newline at end of file +} Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java Tue Sep 2 19:56:56 2014 @@ -270,7 +270,7 @@ public class PTFRowContainer<Row extends FileSystem fs = finalOutPath.getFileSystem(jc); final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc, fs, finalOutPath, - BytesWritable.class, valueClass, isCompressed, progress); + BytesWritable.class, valueClass, isCompressed, progress); return new PTFRecordWriter(outStream); } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue Sep 2 19:56:56 2014 @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import javax.security.auth.login.LoginException; @@ -121,13 +122,14 @@ import com.google.common.collect.Lists; */ public class DagUtils { + public static final String TEZ_TMP_DIR_KEY = "_hive_tez_tmp_dir"; private static final Log LOG = LogFactory.getLog(DagUtils.class.getName()); private static final String TEZ_DIR = "_tez_scratch_dir"; private static DagUtils instance; private void addCredentials(MapWork mapWork, DAG dag) { Set<String> paths = mapWork.getPathToAliases().keySet(); - if (paths != null && !paths.isEmpty()) { + if (!paths.isEmpty()) { Iterator<URI> pathIterator = Iterators.transform(paths.iterator(), new Function<String, URI>() { @Override public URI apply(String input) { @@ -155,7 +157,7 @@ public class DagUtils { * Creates the configuration object necessary to run a specific vertex from * map work. This includes input formats, input processor, etc. */ - private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { + private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork mapWork) { JobConf conf = new JobConf(baseConf); if (mapWork.getNumMapTasks() != null) { @@ -198,7 +200,13 @@ public class DagUtils { inpFormat = CombineHiveInputFormat.class.getName(); } - // Is this required ? + if (mapWork.getDummyTableScan()) { + // hive input format doesn't handle the special condition of no paths + 1 + // split correctly. + inpFormat = CombineHiveInputFormat.class.getName(); + } + + conf.set(TEZ_TMP_DIR_KEY, context.getMRTmpPath().toUri().toString()); conf.set("mapred.mapper.class", ExecMapper.class.getName()); conf.set("mapred.input.format.class", inpFormat); @@ -516,7 +524,7 @@ public class DagUtils { /* * Helper function to create JobConf for specific ReduceWork. */ - private JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) { + private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) { JobConf conf = new JobConf(baseConf); // Is this required ? @@ -686,7 +694,7 @@ public class DagUtils { /** * Localizes files, archives and jars from a provided array of names. - * @param hdfsDirPathStr Destination directoty in HDFS. + * @param hdfsDirPathStr Destination directory in HDFS. * @param conf Configuration. * @param inputOutputJars The file names to localize. * @return List<LocalResource> local resources to add to execution @@ -760,7 +768,7 @@ public class DagUtils { } /** - * @param pathStr - the string from which we try to determine the resource base name + * @param path - the path from which we try to determine the resource base name * @return the name of the resource from a given path string. */ public String getResourceBaseName(Path path) { @@ -806,9 +814,8 @@ public class DagUtils { int waitAttempts = conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal); - long sleepInterval = - conf.getLong(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.varname, - HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.defaultLongVal); + long sleepInterval = HiveConf.getTimeVar( + conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS); LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: " + sleepInterval); boolean found = false; @@ -874,14 +881,14 @@ public class DagUtils { * @param work BaseWork will be used to populate the configuration object. * @return JobConf new configuration object */ - public JobConf initializeVertexConf(JobConf conf, BaseWork work) { + public JobConf initializeVertexConf(JobConf conf, Context context, BaseWork work) { // simply dispatch the call to the right method for the actual (sub-) type of // BaseWork. if (work instanceof MapWork) { - return initializeVertexConf(conf, (MapWork)work); + return initializeVertexConf(conf, context, (MapWork)work); } else if (work instanceof ReduceWork) { - return initializeVertexConf(conf, (ReduceWork)work); + return initializeVertexConf(conf, context, (ReduceWork)work); } else { assert false; return null; @@ -895,7 +902,6 @@ public class DagUtils { * @param work The instance of BaseWork representing the actual work to be performed * by this vertex. * @param scratchDir HDFS scratch dir for this execution unit. - * @param list * @param appJarLr Local resource for hive-exec. * @param additionalLr * @param fileSystem FS corresponding to scratchDir and LocalResources Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue Sep 2 19:56:56 2014 @@ -260,7 +260,7 @@ public class TezTask extends Task<TezWor } } else { // Regular vertices - JobConf wxConf = utils.initializeVertexConf(conf, w); + JobConf wxConf = utils.initializeVertexConf(conf, ctx, w); Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, work); dag.addVertex(wx); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Sep 2 19:56:56 2014 @@ -145,7 +145,11 @@ public class VectorFileSinkOperator exte } rowOutWriters = fpaths.getOutWriters(); - if (conf.isGatherStats()) { + // check if all record writers implement statistics. if atleast one RW + // doesn't implement stats interface we will fallback to conventional way + // of gathering stats + isCollectRWStats = areAllTrue(statsFromRecordWriter); + if (conf.isGatherStats() && !isCollectRWStats) { if (statsCollectRawDataSize) { SerDeStats stats = serializer.getSerDeStats(); if (stats != null) { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java Tue Sep 2 19:56:56 2014 @@ -146,7 +146,7 @@ public class VectorHashKeyWrapper extend duplicateTo(clone); return clone; } - + public void duplicateTo(VectorHashKeyWrapper clone) { clone.longValues = longValues.clone(); clone.doubleValues = doubleValues.clone(); @@ -155,7 +155,7 @@ public class VectorHashKeyWrapper extend // Decimal128 requires deep clone clone.decimalValues = new Decimal128[decimalValues.length]; for(int i = 0; i < decimalValues.length; ++i) { - clone.decimalValues[i] = new Decimal128().update(decimalValues[i]); + clone.decimalValues[i] = new Decimal128().update(decimalValues[i]); } clone.byteValues = new byte[byteValues.length][]; Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Tue Sep 2 19:56:56 2014 @@ -325,11 +325,11 @@ public class VectorizationContext { childExpressions, mode, exprDesc.getTypeInfo()); } } else if (exprDesc instanceof ExprNodeNullDesc) { - ve = getConstantVectorExpression(null, exprDesc.getTypeInfo(), mode); + ve = getConstantVectorExpression(null, exprDesc.getTypeInfo(), mode); } else if (exprDesc instanceof ExprNodeConstantDesc) { ve = getConstantVectorExpression(((ExprNodeConstantDesc) exprDesc).getValue(), exprDesc.getTypeInfo(), mode); - } + } if (ve == null) { throw new HiveException("Could not vectorize expression: "+exprDesc.getName()); } @@ -413,8 +413,8 @@ public class VectorizationContext { } } } else { - for (ExprNodeDesc child : children) { - ExprNodeDesc castExpression = getImplicitCastExpression(genericUDF, child, commonType); + for (ExprNodeDesc child : children) { + ExprNodeDesc castExpression = getImplicitCastExpression(genericUDF, child, commonType); if (castExpression != null) { atleastOneCastNeeded = true; childrenWithCasts.add(castExpression); @@ -515,7 +515,7 @@ public class VectorizationContext { } return null; } - + private int getPrecisionForType(PrimitiveTypeInfo typeInfo) { if (isFloatFamily(typeInfo.getTypeName())) { return HiveDecimal.MAX_PRECISION; @@ -572,8 +572,8 @@ public class VectorizationContext { ((GenericUDFBridge) genericUdf).setUdfClassName(udfClass.getClass().getName()); } if (genericUdf instanceof SettableUDF) { - ((SettableUDF)genericUdf).setTypeInfo(castType); - } + ((SettableUDF) genericUdf).setTypeInfo(castType); + } return genericUdf; } @@ -662,63 +662,63 @@ public class VectorizationContext { * @throws HiveException */ ExprNodeDesc evaluateCastOnConstants(ExprNodeDesc exprDesc) throws HiveException { - if (!(exprDesc instanceof ExprNodeGenericFuncDesc)) { - return exprDesc; - } - - if (exprDesc.getChildren() == null || (exprDesc.getChildren().size() != 1) ) { - return exprDesc; - } - - ExprNodeConstantDesc foldedChild = null; - if (!( exprDesc.getChildren().get(0) instanceof ExprNodeConstantDesc)) { - - // try recursive folding - ExprNodeDesc expr = evaluateCastOnConstants(exprDesc.getChildren().get(0)); - if (expr instanceof ExprNodeConstantDesc) { - foldedChild = (ExprNodeConstantDesc) expr; - } - } else { - foldedChild = (ExprNodeConstantDesc) exprDesc.getChildren().get(0); - } - - if (foldedChild == null) { - return exprDesc; - } - - ObjectInspector childoi = foldedChild.getWritableObjectInspector(); - GenericUDF gudf = ((ExprNodeGenericFuncDesc) exprDesc).getGenericUDF(); - - // Only evaluate +ve/-ve or cast on constant or recursive casting. - if (gudf instanceof GenericUDFOPNegative || gudf instanceof GenericUDFOPPositive || - castExpressionUdfs.contains(gudf.getClass()) - || ((gudf instanceof GenericUDFBridge) - && castExpressionUdfs.contains(((GenericUDFBridge) gudf).getUdfClass()))) { - ExprNodeEvaluator<?> evaluator = ExprNodeEvaluatorFactory.get(exprDesc); - ObjectInspector output = evaluator.initialize(childoi); - Object constant = evaluator.evaluate(null); - Object java = ObjectInspectorUtils.copyToStandardJavaObject(constant, output); - return new ExprNodeConstantDesc(exprDesc.getTypeInfo(), java); - } - - return exprDesc; + if (!(exprDesc instanceof ExprNodeGenericFuncDesc)) { + return exprDesc; + } + + if (exprDesc.getChildren() == null || (exprDesc.getChildren().size() != 1) ) { + return exprDesc; + } + + ExprNodeConstantDesc foldedChild = null; + if (!( exprDesc.getChildren().get(0) instanceof ExprNodeConstantDesc)) { + + // try recursive folding + ExprNodeDesc expr = evaluateCastOnConstants(exprDesc.getChildren().get(0)); + if (expr instanceof ExprNodeConstantDesc) { + foldedChild = (ExprNodeConstantDesc) expr; + } + } else { + foldedChild = (ExprNodeConstantDesc) exprDesc.getChildren().get(0); + } + + if (foldedChild == null) { + return exprDesc; + } + + ObjectInspector childoi = foldedChild.getWritableObjectInspector(); + GenericUDF gudf = ((ExprNodeGenericFuncDesc) exprDesc).getGenericUDF(); + + // Only evaluate +ve/-ve or cast on constant or recursive casting. + if (gudf instanceof GenericUDFOPNegative || gudf instanceof GenericUDFOPPositive || + castExpressionUdfs.contains(gudf.getClass()) + || ((gudf instanceof GenericUDFBridge) + && castExpressionUdfs.contains(((GenericUDFBridge) gudf).getUdfClass()))) { + ExprNodeEvaluator<?> evaluator = ExprNodeEvaluatorFactory.get(exprDesc); + ObjectInspector output = evaluator.initialize(childoi); + Object constant = evaluator.evaluate(null); + Object java = ObjectInspectorUtils.copyToStandardJavaObject(constant, output); + return new ExprNodeConstantDesc(exprDesc.getTypeInfo(), java); + } + + return exprDesc; } - + /* For cast on constant operator in all members of the input list and return new list * containing results. */ private List<ExprNodeDesc> evaluateCastOnConstants(List<ExprNodeDesc> childExpr) - throws HiveException { - List<ExprNodeDesc> evaluatedChildren = new ArrayList<ExprNodeDesc>(); - if (childExpr != null) { + throws HiveException { + List<ExprNodeDesc> evaluatedChildren = new ArrayList<ExprNodeDesc>(); + if (childExpr != null) { for (ExprNodeDesc expr : childExpr) { - expr = this.evaluateCastOnConstants(expr); - evaluatedChildren.add(expr); + expr = this.evaluateCastOnConstants(expr); + evaluatedChildren.add(expr); } - } - return evaluatedChildren; + } + return evaluatedChildren; } - + private VectorExpression getConstantVectorExpression(Object constantValue, TypeInfo typeInfo, Mode mode) throws HiveException { String type = typeInfo.getTypeName(); @@ -728,7 +728,7 @@ public class VectorizationContext { outCol = ocm.allocateOutputColumn(colVectorType); } if (constantValue == null) { - return new ConstantVectorExpression(outCol, type, true); + return new ConstantVectorExpression(outCol, type, true); } else if (decimalTypePattern.matcher(type).matches()) { VectorExpression ve = new ConstantVectorExpression(outCol, (Decimal128) constantValue); ve.setOutputType(typeInfo.getTypeName()); @@ -907,9 +907,9 @@ public class VectorizationContext { private VectorExpression getGenericUdfVectorExpression(GenericUDF udf, List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType) throws HiveException { - List<ExprNodeDesc> castedChildren = evaluateCastOnConstants(childExpr); - childExpr = castedChildren; - + List<ExprNodeDesc> castedChildren = evaluateCastOnConstants(childExpr); + childExpr = castedChildren; + //First handle special cases if (udf instanceof GenericUDFBetween) { return getBetweenFilterExpression(childExpr, mode, returnType); @@ -933,8 +933,8 @@ public class VectorizationContext { } } else if (udf instanceof GenericUDFToDecimal) { return getCastToDecimal(childExpr, returnType); - } - + } + // Now do a general lookup Class<?> udfClass = udf.getClass(); if (udf instanceof GenericUDFBridge) { @@ -1003,7 +1003,7 @@ public class VectorizationContext { } } } - + /** * Create a filter or boolean-valued expression for column IN ( <list-of-constants> ) */ @@ -1014,8 +1014,8 @@ public class VectorizationContext { String colType = colExpr.getTypeString(); // prepare arguments for createVectorExpression - List<ExprNodeDesc> childrenForInList = evaluateCastOnConstants(childExpr.subList(1, childExpr.size())); - + List<ExprNodeDesc> childrenForInList = evaluateCastOnConstants(childExpr.subList(1, childExpr.size())); + /* This method assumes that the IN list has no NULL entries. That is enforced elsewhere, * in the Vectorizer class. If NULL is passed in as a list entry, behavior is not defined. * If in the future, NULL values are allowed in the IN list, be sure to handle 3-valued @@ -1110,105 +1110,105 @@ public class VectorizationContext { return getCastToString(childExpr, returnType); } return null; - } - + } + private Decimal128 castConstantToDecimal(Object scalar, TypeInfo type) throws HiveException { - PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; - String typename = type.getTypeName(); - Decimal128 d = new Decimal128(); - int scale = HiveDecimalUtils.getScaleForType(ptinfo); - switch (ptinfo.getPrimitiveCategory()) { - case FLOAT: - float floatVal = ((Float) scalar).floatValue(); - d.update(floatVal, (short) scale); - break; - case DOUBLE: - double doubleVal = ((Double) scalar).doubleValue(); - d.update(doubleVal, (short) scale); - break; - case BYTE: - byte byteVal = ((Byte) scalar).byteValue(); - d.update(byteVal, (short) scale); - break; - case SHORT: - short shortVal = ((Short) scalar).shortValue(); - d.update(shortVal, (short) scale); - break; - case INT: - int intVal = ((Integer) scalar).intValue(); - d.update(intVal, (short) scale); - break; - case LONG: - long longVal = ((Long) scalar).longValue(); - d.update(longVal, (short) scale); - break; - case DECIMAL: - HiveDecimal decimalVal = (HiveDecimal) scalar; - d.update(decimalVal.unscaledValue(), (short) scale); - break; - default: - throw new HiveException("Unsupported type "+typename+" for cast to Decimal128"); - } - return d; + PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; + String typename = type.getTypeName(); + Decimal128 d = new Decimal128(); + int scale = HiveDecimalUtils.getScaleForType(ptinfo); + switch (ptinfo.getPrimitiveCategory()) { + case FLOAT: + float floatVal = ((Float) scalar).floatValue(); + d.update(floatVal, (short) scale); + break; + case DOUBLE: + double doubleVal = ((Double) scalar).doubleValue(); + d.update(doubleVal, (short) scale); + break; + case BYTE: + byte byteVal = ((Byte) scalar).byteValue(); + d.update(byteVal, (short) scale); + break; + case SHORT: + short shortVal = ((Short) scalar).shortValue(); + d.update(shortVal, (short) scale); + break; + case INT: + int intVal = ((Integer) scalar).intValue(); + d.update(intVal, (short) scale); + break; + case LONG: + long longVal = ((Long) scalar).longValue(); + d.update(longVal, (short) scale); + break; + case DECIMAL: + HiveDecimal decimalVal = (HiveDecimal) scalar; + d.update(decimalVal.unscaledValue(), (short) scale); + break; + default: + throw new HiveException("Unsupported type "+typename+" for cast to Decimal128"); + } + return d; } private String castConstantToString(Object scalar, TypeInfo type) throws HiveException { - PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; - String typename = type.getTypeName(); - switch (ptinfo.getPrimitiveCategory()) { - case FLOAT: - case DOUBLE: - case BYTE: - case SHORT: - case INT: - case LONG: - return ((Number) scalar).toString(); - case DECIMAL: - HiveDecimal decimalVal = (HiveDecimal) scalar; - return decimalVal.toString(); - default: - throw new HiveException("Unsupported type "+typename+" for cast to String"); - } + PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; + String typename = type.getTypeName(); + switch (ptinfo.getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + case BYTE: + case SHORT: + case INT: + case LONG: + return ((Number) scalar).toString(); + case DECIMAL: + HiveDecimal decimalVal = (HiveDecimal) scalar; + return decimalVal.toString(); + default: + throw new HiveException("Unsupported type "+typename+" for cast to String"); + } } private Double castConstantToDouble(Object scalar, TypeInfo type) throws HiveException { - PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; - String typename = type.getTypeName(); - switch (ptinfo.getPrimitiveCategory()) { - case FLOAT: - case DOUBLE: - case BYTE: - case SHORT: - case INT: - case LONG: - return ((Number) scalar).doubleValue(); - case DECIMAL: - HiveDecimal decimalVal = (HiveDecimal) scalar; - return decimalVal.doubleValue(); - default: - throw new HiveException("Unsupported type "+typename+" for cast to Double"); - } - } + PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; + String typename = type.getTypeName(); + switch (ptinfo.getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + case BYTE: + case SHORT: + case INT: + case LONG: + return ((Number) scalar).doubleValue(); + case DECIMAL: + HiveDecimal decimalVal = (HiveDecimal) scalar; + return decimalVal.doubleValue(); + default: + throw new HiveException("Unsupported type "+typename+" for cast to Double"); + } + } private Long castConstantToLong(Object scalar, TypeInfo type) throws HiveException { - PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; - String typename = type.getTypeName(); - switch (ptinfo.getPrimitiveCategory()) { - case FLOAT: - case DOUBLE: - case BYTE: - case SHORT: - case INT: - case LONG: - return ((Number) scalar).longValue(); - case DECIMAL: - HiveDecimal decimalVal = (HiveDecimal) scalar; - return decimalVal.longValue(); - default: - throw new HiveException("Unsupported type "+typename+" for cast to Long"); - } - } - + PrimitiveTypeInfo ptinfo = (PrimitiveTypeInfo) type; + String typename = type.getTypeName(); + switch (ptinfo.getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + case BYTE: + case SHORT: + case INT: + case LONG: + return ((Number) scalar).longValue(); + case DECIMAL: + HiveDecimal decimalVal = (HiveDecimal) scalar; + return decimalVal.longValue(); + default: + throw new HiveException("Unsupported type "+typename+" for cast to Long"); + } + } + private VectorExpression getCastToDecimal(List<ExprNodeDesc> childExpr, TypeInfo returnType) throws HiveException { ExprNodeDesc child = childExpr.get(0); @@ -1217,9 +1217,9 @@ public class VectorizationContext { // Return a constant vector expression Object constantValue = ((ExprNodeConstantDesc) child).getValue(); Decimal128 decimalValue = castConstantToDecimal(constantValue, child.getTypeInfo()); - return getConstantVectorExpression(decimalValue, returnType, Mode.PROJECTION); + return getConstantVectorExpression(decimalValue, returnType, Mode.PROJECTION); } else if (child instanceof ExprNodeNullDesc) { - return getConstantVectorExpression(null, returnType, Mode.PROJECTION); + return getConstantVectorExpression(null, returnType, Mode.PROJECTION); } if (isIntFamily(inputType)) { return createVectorExpression(CastLongToDecimal.class, childExpr, Mode.PROJECTION, returnType); @@ -1234,8 +1234,8 @@ public class VectorizationContext { return createVectorExpression(CastTimestampToDecimal.class, childExpr, Mode.PROJECTION, returnType); } throw new HiveException("Unhandled cast input type: " + inputType); - } - + } + private VectorExpression getCastToString(List<ExprNodeDesc> childExpr, TypeInfo returnType) throws HiveException { ExprNodeDesc child = childExpr.get(0); @@ -1244,9 +1244,9 @@ public class VectorizationContext { // Return a constant vector expression Object constantValue = ((ExprNodeConstantDesc) child).getValue(); String strValue = castConstantToString(constantValue, child.getTypeInfo()); - return getConstantVectorExpression(strValue, returnType, Mode.PROJECTION); + return getConstantVectorExpression(strValue, returnType, Mode.PROJECTION); } else if (child instanceof ExprNodeNullDesc) { - return getConstantVectorExpression(null, returnType, Mode.PROJECTION); + return getConstantVectorExpression(null, returnType, Mode.PROJECTION); } if (inputType.equals("boolean")) { // Boolean must come before the integer family. It's a special case. @@ -1273,9 +1273,9 @@ public class VectorizationContext { // Return a constant vector expression Object constantValue = ((ExprNodeConstantDesc) child).getValue(); Double doubleValue = castConstantToDouble(constantValue, child.getTypeInfo()); - return getConstantVectorExpression(doubleValue, returnType, Mode.PROJECTION); + return getConstantVectorExpression(doubleValue, returnType, Mode.PROJECTION); } else if (child instanceof ExprNodeNullDesc) { - return getConstantVectorExpression(null, returnType, Mode.PROJECTION); + return getConstantVectorExpression(null, returnType, Mode.PROJECTION); } if (isIntFamily(inputType)) { return createVectorExpression(CastLongToDouble.class, childExpr, Mode.PROJECTION, returnType); @@ -1286,7 +1286,7 @@ public class VectorizationContext { // float types require no conversion, so use a no-op return getIdentityExpression(childExpr); - } + } // The string type is deliberately omitted -- it's handled elsewhere. See isLegacyPathUDF. return null; @@ -1301,7 +1301,7 @@ public class VectorizationContext { // Family of related JIRAs: HIVE-7421, HIVE-7422, and HIVE-7424. return null; } else if (child instanceof ExprNodeNullDesc) { - return getConstantVectorExpression(null, TypeInfoFactory.booleanTypeInfo, Mode.PROJECTION); + return getConstantVectorExpression(null, TypeInfoFactory.booleanTypeInfo, Mode.PROJECTION); } // Long and double are handled using descriptors, string needs to be specially handled. if (inputType.equals("string")) { @@ -1329,9 +1329,9 @@ public class VectorizationContext { // Return a constant vector expression Object constantValue = ((ExprNodeConstantDesc) child).getValue(); Long longValue = castConstantToLong(constantValue, child.getTypeInfo()); - return getConstantVectorExpression(longValue, TypeInfoFactory.longTypeInfo, Mode.PROJECTION); + return getConstantVectorExpression(longValue, TypeInfoFactory.longTypeInfo, Mode.PROJECTION); } else if (child instanceof ExprNodeNullDesc) { - return getConstantVectorExpression(null, TypeInfoFactory.longTypeInfo, Mode.PROJECTION); + return getConstantVectorExpression(null, TypeInfoFactory.longTypeInfo, Mode.PROJECTION); } // Float family, timestamp are handled via descriptor based lookup, int family needs // special handling. @@ -1519,11 +1519,11 @@ public class VectorizationContext { public static boolean isTimestampFamily(String resultType) { return resultType.equalsIgnoreCase("timestamp"); } - + public static boolean isDateFamily(String resultType) { return resultType.equalsIgnoreCase("date"); } - + // return true if this is any kind of float public static boolean isFloatFamily(String resultType) { return resultType.equalsIgnoreCase("double") Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java Tue Sep 2 19:56:56 2014 @@ -75,23 +75,23 @@ public class ConstantVectorExpression ex this(outputColumn, "decimal"); setDecimalValue(value); } - + /* * Support for null constant object */ public ConstantVectorExpression(int outputColumn, String typeString, boolean isNull) { - this(outputColumn, typeString); - isNullValue = isNull; + this(outputColumn, typeString); + isNullValue = isNull; } - + private void evaluateLong(VectorizedRowBatch vrg) { LongColumnVector cv = (LongColumnVector) vrg.cols[outputColumn]; cv.isRepeating = true; cv.noNulls = !isNullValue; if (!isNullValue) { - cv.vector[0] = longValue; + cv.vector[0] = longValue; } else { - cv.isNull[0] = true; + cv.isNull[0] = true; } } @@ -100,10 +100,10 @@ public class ConstantVectorExpression ex cv.isRepeating = true; cv.noNulls = !isNullValue; if (!isNullValue) { - cv.vector[0] = doubleValue; + cv.vector[0] = doubleValue; } else { - cv.isNull[0] = true; - } + cv.isNull[0] = true; + } } private void evaluateBytes(VectorizedRowBatch vrg) { @@ -112,9 +112,9 @@ public class ConstantVectorExpression ex cv.noNulls = !isNullValue; cv.initBuffer(); if (!isNullValue) { - cv.setVal(0, bytesValue, 0, bytesValueLength); + cv.setVal(0, bytesValue, 0, bytesValueLength); } else { - cv.isNull[0] = true; + cv.isNull[0] = true; } } @@ -123,9 +123,9 @@ public class ConstantVectorExpression ex dcv.isRepeating = true; dcv.noNulls = !isNullValue; if (!isNullValue) { - dcv.vector[0].update(decimalValue); + dcv.vector[0].update(decimalValue); } else { - dcv.isNull[0] = true; + dcv.isNull[0] = true; } } @@ -194,7 +194,7 @@ public class ConstantVectorExpression ex } else if (VectorizationContext.isDecimalFamily(typeString)){ this.type = Type.DECIMAL; } else { - // everything else that does not belong to string, double, decimal is treated as long. + // everything else that does not belong to string, double, decimal is treated as long. this.type = Type.LONG; } } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IdentityExpression.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IdentityExpression.java?rev=1622108&r1=1622107&r2=1622108&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IdentityExpression.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IdentityExpression.java Tue Sep 2 19:56:56 2014 @@ -33,17 +33,17 @@ public class IdentityExpression extends public IdentityExpression() { } - public IdentityExpression(int colNum, String type) { - this.colNum = colNum; + public IdentityExpression(int colNum, String type) { + this.colNum = colNum; this.type = type; - } + } - @Override + @Override public void evaluate(VectorizedRowBatch batch) { if (childExpressions != null) { this.evaluateChildren(batch); } - } + } @Override public int getOutputColumn() {