Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Thu Oct 30 16:22:33 2014 @@ -25,8 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -51,10 +49,13 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.hash.MurmurHash; +import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; + /** * Reduce Sink Operator sends output to the reduce stage. **/ @@ -65,10 +66,13 @@ public class ReduceSinkOperator extends PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex"); } - private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName()); - private static final boolean isInfoEnabled = LOG.isInfoEnabled(); - private static final boolean isDebugEnabled = LOG.isDebugEnabled(); - private static final boolean isTraceEnabled = LOG.isTraceEnabled(); + /** + * Counters. + */ + public static enum Counter { + RECORDS_OUT_INTERMEDIATE + } + private static final long serialVersionUID = 1L; private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); @@ -110,7 +114,7 @@ public class ReduceSinkOperator extends protected transient int numDistributionKeys; protected transient int numDistinctExprs; protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD) - protected transient boolean autoParallel = false; + protected transient boolean useUniformHash = false; // picks topN K:V pairs from input. protected transient TopNHash reducerHash = new TopNHash(); protected transient HiveKey keyWritable = new HiveKey(); @@ -144,12 +148,25 @@ public class ReduceSinkOperator extends private StructObjectInspector recIdInspector; // OI for the record identifier private IntObjectInspector bucketInspector; // OI for the bucket field in the record id + protected transient long numRows = 0; + protected transient long cntr = 1; + private final transient LongWritable recordCounter = new LongWritable(); + @Override protected void initializeOp(Configuration hconf) throws HiveException { try { + + numRows = 0; + + String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ","_"); + } + statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter); + List<ExprNodeDesc> keys = conf.getKeyCols(); - if (isDebugEnabled) { + if (isLogDebugEnabled) { LOG.debug("keys size is " + keys.size()); for (ExprNodeDesc k : keys) { LOG.debug("Key exprNodeDesc " + k.getExprString()); @@ -194,7 +211,7 @@ public class ReduceSinkOperator extends tag = conf.getTag(); tagByte[0] = (byte) tag; skipTag = conf.getSkipTag(); - if (isInfoEnabled) { + if (isLogInfoEnabled) { LOG.info("Using tag = " + tag); } @@ -217,7 +234,7 @@ public class ReduceSinkOperator extends reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); } - autoParallel = conf.isAutoParallel(); + useUniformHash = conf.getReducerTraits().contains(UNIFORM); firstRow = true; initializeChildren(hconf); @@ -296,7 +313,7 @@ public class ReduceSinkOperator extends bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector(); } - if (isInfoEnabled) { + if (isLogInfoEnabled) { LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys()); } @@ -339,10 +356,10 @@ public class ReduceSinkOperator extends final int hashCode; // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] - if (autoParallel && partitionEval.length > 0) { + if (useUniformHash && partitionEval.length > 0) { hashCode = computeMurmurHash(firstKey); } else { - hashCode = computeHashCode(row); + hashCode = computeHashCode(row, bucketNumber); } firstKey.setHashCode(hashCode); @@ -391,7 +408,7 @@ public class ReduceSinkOperator extends // column directly. Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField); buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); - if (isTraceEnabled) { + if (isLogTraceEnabled) { LOG.trace("Acid choosing bucket number " + buckNum); } } else { @@ -438,7 +455,7 @@ public class ReduceSinkOperator extends return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); } - private int computeHashCode(Object row) throws HiveException { + private int computeHashCode(Object row, int buckNum) throws HiveException { // Evaluate the HashCode int keyHashCode = 0; if (partitionEval.length == 0) { @@ -462,10 +479,11 @@ public class ReduceSinkOperator extends + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); } } - if (isTraceEnabled) { - LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber)); + int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; + if (isLogTraceEnabled) { + LOG.trace("Going to return hash code " + hashCode); } - return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber; + return hashCode; } private boolean partitionKeysAreNull(Object row) throws HiveException { @@ -506,6 +524,13 @@ public class ReduceSinkOperator extends // Since this is a terminal operator, update counters explicitly - // forward is not called if (null != out) { + numRows++; + if (isLogInfoEnabled) { + if (numRows == cntr) { + cntr *= 10; + LOG.info(toString() + ": records written - " + numRows); + } + } out.collect(keyWritable, valueWritable); } } @@ -535,6 +560,10 @@ public class ReduceSinkOperator extends } super.closeOp(abort); out = null; + if (isLogInfoEnabled) { + LOG.info(toString() + ": records written - " + numRows); + } + recordCounter.set(numRows); } /**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java Thu Oct 30 16:22:33 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; /** * RowSchema Implementation. @@ -49,6 +50,51 @@ public class RowSchema implements Serial } @Override + public boolean equals(Object obj) { + if (!(obj instanceof RowSchema) || (obj == null)) { + return false; + } + if(this == obj) { + return true; + } + + RowSchema dest = (RowSchema)obj; + if(this.signature == null && dest.getSignature() == null) { + return true; + } + if((this.signature == null && dest.getSignature() != null) || + (this.signature != null && dest.getSignature() == null) ) { + return false; + } + + if(this.signature.size() != dest.getSignature().size()) { + return false; + } + + Iterator<ColumnInfo> origIt = this.signature.iterator(); + Iterator<ColumnInfo> destIt = dest.getSignature().iterator(); + while(origIt.hasNext()) { + ColumnInfo origColumn = origIt.next(); + ColumnInfo destColumn = destIt.next(); + + if(origColumn == null && destColumn == null) { + continue; + } + + if((origColumn == null && destColumn != null) || + (origColumn != null && destColumn == null) ) { + return false; + } + + if(!origColumn.equals(destColumn)) { + return false; + } + } + + return true; + } + + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append('('); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Thu Oct 30 16:22:33 2014 @@ -27,8 +27,10 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit; @@ -83,6 +85,8 @@ public class ScriptOperator extends Oper transient Deserializer scriptOutputDeserializer; transient volatile Throwable scriptError = null; transient RecordWriter scriptOutWriter = null; + // List of conf entries not to turn into env vars + transient Set<String> blackListedConfEntries = null; static final String IO_EXCEPTION_BROKEN_PIPE_STRING = "Broken pipe"; static final String IO_EXCEPTION_STREAM_CLOSED = "Stream closed"; @@ -120,7 +124,8 @@ public class ScriptOperator extends Oper /** * Most UNIX implementations impose some limit on the total size of environment variables and - * size of strings. To fit in this limit we need sometimes to truncate strings. + * size of strings. To fit in this limit we need sometimes to truncate strings. Also, + * some values tend be long and are meaningless to scripts, so strain them out. * @param value environment variable value to check * @param name name of variable (used only for logging purposes) * @param truncate truncate value or not @@ -139,6 +144,23 @@ public class ScriptOperator extends Oper return value; } + boolean blackListed(String name) { + if (blackListedConfEntries == null) { + blackListedConfEntries = new HashSet<String>(); + if (hconf != null) { + String bl = hconf.get(HiveConf.ConfVars.HIVESCRIPT_ENV_BLACKLIST.toString()); + if (bl != null && bl.length() > 0) { + String[] bls = bl.split(","); + for (String b : bls) { + b.replaceAll(".", "_"); + blackListedConfEntries.add(b); + } + } + } + } + return blackListedConfEntries.contains(name); + } + /** * addJobConfToEnvironment is mostly shamelessly copied from hadoop streaming. Added additional * check on environment variable length @@ -148,13 +170,16 @@ public class ScriptOperator extends Oper while (it.hasNext()) { Map.Entry<String, String> en = it.next(); String name = en.getKey(); - // String value = (String)en.getValue(); // does not apply variable - // expansion - String value = conf.get(name); // does variable expansion - name = safeEnvVarName(name); - boolean truncate = conf.getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false); - value = safeEnvVarValue(value, name, truncate); - env.put(name, value); + if (!blackListed(name)) { + // String value = (String)en.getValue(); // does not apply variable + // expansion + String value = conf.get(name); // does variable expansion + name = safeEnvVarName(name); + boolean truncate = conf + .getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false); + value = safeEnvVarValue(value, name, truncate); + env.put(name, value); + } } } @@ -238,8 +263,8 @@ public class ScriptOperator extends Oper protected void initializeOp(Configuration hconf) throws HiveException { firstRow = true; - statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count); - statsMap.put(Counter.SERIALIZE_ERRORS, serialize_error_count); + statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count); + statsMap.put(Counter.SERIALIZE_ERRORS.toString(), serialize_error_count); try { this.hconf = hconf; @@ -285,6 +310,16 @@ public class ScriptOperator extends Oper return; } + private transient String tableName; + private transient String partitionName ; + + @Override + public void setInputContext(String inputPath, String tableName, String partitionName) { + this.tableName = tableName; + this.partitionName = partitionName; + super.setInputContext(inputPath, tableName, partitionName); + } + @Override public void processOp(Object row, int tag) throws HiveException { // initialize the user's process only when you receive the first row @@ -313,10 +348,8 @@ public class ScriptOperator extends Oper String[] wrappedCmdArgs = addWrapper(cmdArgs); LOG.info("Executing " + Arrays.asList(wrappedCmdArgs)); - LOG.info("tablename=" - + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname)); - LOG.info("partname=" - + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname)); + LOG.info("tablename=" + tableName); + LOG.info("partname=" + partitionName); LOG.info("alias=" + alias); ProcessBuilder pb = new ProcessBuilder(wrappedCmdArgs); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Oct 30 16:22:33 2014 @@ -125,4 +125,31 @@ public class SelectOperator extends Oper public boolean acceptLimitPushdown() { return true; } + + /** + * Checks whether this select operator does something to the + * input tuples. + * + * @return if it is an identity select operator or not + */ + public boolean isIdentitySelect() { + //Safety check + if(this.getNumParent() != 1) { + return false; + } + + //Select * + if(this.getConf().isSelStarNoCompute() || + this.getConf().isSelectStar()) { + return true; + } + + //Check whether the have the same schema + if(!OperatorUtils.sameRowSchema(this, this.getParentOperators().get(0))) { + return false; + } + + return true; + } + } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java Thu Oct 30 16:22:33 2014 @@ -153,7 +153,9 @@ public class StatsNoJobTask extends Task partn.getInputFormatClass(), jc); InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0, new String[] { partn.getLocation() }); - Object recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL); + org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = + (org.apache.hadoop.mapred.RecordReader<?, ?>) + inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL); StatsProvidingRecordReader statsRR; if (recordReader instanceof StatsProvidingRecordReader) { statsRR = (StatsProvidingRecordReader) recordReader; @@ -163,6 +165,7 @@ public class StatsNoJobTask extends Task numFiles += 1; statsAvailable = true; } + recordReader.close(); } } @@ -254,6 +257,7 @@ public class StatsNoJobTask extends Task numFiles += 1; statsAvailable = true; } + recordReader.close(); } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Thu Oct 30 16:22:33 2014 @@ -80,7 +80,7 @@ public class UnionOperator extends Opera for (int p = 0; p < parents; p++) { assert (parentFields[p].size() == columns); for (int c = 0; c < columns; c++) { - if (!columnTypeResolvers[c].update(parentFields[p].get(c) + if (!columnTypeResolvers[c].updateForUnionAll(parentFields[p].get(c) .getFieldObjectInspector())) { // checked in SemanticAnalyzer. Should not happen throw new HiveException("Incompatible types for union operator"); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Oct 30 16:22:33 2014 @@ -416,7 +416,7 @@ public final class Utilities { } gWorkMap.put(path, gWork); } else { - LOG.debug("Found plan in cache."); + LOG.debug("Found plan in cache for name: " + name); gWork = gWorkMap.get(path); } return gWork; @@ -437,20 +437,20 @@ public final class Utilities { } } - public static Map<String, Map<Integer, String>> getScratchColumnVectorTypes(Configuration hiveConf) { + public static Map<String, Map<Integer, String>> getAllScratchColumnVectorTypeMaps(Configuration hiveConf) { BaseWork baseWork = getMapWork(hiveConf); if (baseWork == null) { baseWork = getReduceWork(hiveConf); } - return baseWork.getScratchColumnVectorTypes(); + return baseWork.getAllScratchColumnVectorTypeMaps(); } - public static Map<String, Map<String, Integer>> getScratchColumnMap(Configuration hiveConf) { + public static Map<String, Map<String, Integer>> getAllColumnVectorMaps(Configuration hiveConf) { BaseWork baseWork = getMapWork(hiveConf); if (baseWork == null) { baseWork = getReduceWork(hiveConf); } - return baseWork.getScratchColumnMap(); + return baseWork.getAllColumnVectorMaps(); } public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { @@ -1635,12 +1635,13 @@ public final class Utilities { * Group 6: copy [copy keyword] * Group 8: 2 [copy file index] */ + private static final String COPY_KEYWORD = "_copy_"; // copy keyword private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX = Pattern.compile("^.*?"+ // any prefix "([0-9]+)"+ // taskId "(_)"+ // separator "([0-9]{1,6})?"+ // attemptId (limited to 6 digits) - "((_)(\\Bcopy\\B)(_)"+ // copy keyword + "((_)(\\Bcopy\\B)(_)" + "([0-9]{1,6})$)?"+ // copy file index "(\\..*)?$"); // any suffix/file extension @@ -2035,6 +2036,15 @@ public final class Utilities { return false; } + public static String getBucketFileNameFromPathSubString(String bucketName) { + try { + return bucketName.split(COPY_KEYWORD)[0]; + } catch (Exception e) { + e.printStackTrace(); + return bucketName; + } + } + public static String getNameMessage(Exception e) { return e.getClass().getName() + "(" + e.getMessage() + ")"; } @@ -2067,15 +2077,21 @@ public final class Utilities { public static ClassLoader getSessionSpecifiedClassLoader() { SessionState state = SessionState.get(); if (state == null || state.getConf() == null) { - LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead"); + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead"); + } return JavaUtils.getClassLoader(); } ClassLoader sessionCL = state.getConf().getClassLoader(); - if (sessionCL != null){ - LOG.debug("Use session specified class loader"); + if (sessionCL != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Use session specified class loader"); + } return sessionCL; } - LOG.debug("Session specified class loader not found, use thread based class loader"); + if (LOG.isDebugEnabled()) { + LOG.debug("Session specified class loader not found, use thread based class loader"); + } return JavaUtils.getClassLoader(); } @@ -2363,6 +2379,32 @@ public final class Utilities { } } + /** + * Copies the storage handler proeprites configured for a table descriptor to a runtime job + * configuration. This differs from {@link #copyTablePropertiesToConf(org.apache.hadoop.hive.ql.plan.TableDesc, org.apache.hadoop.mapred.JobConf)} + * in that it does not allow parameters already set in the job to override the values from the + * table. This is important for setting the config up for reading, + * as the job may already have values in it from another table. + * @param tbl + * @param job + */ + public static void copyTablePropertiesToConf(TableDesc tbl, JobConf job) { + Properties tblProperties = tbl.getProperties(); + for(String name: tblProperties.stringPropertyNames()) { + String val = (String) tblProperties.get(name); + if (val != null) { + job.set(name, StringEscapeUtils.escapeJava(val)); + } + } + Map<String, String> jobProperties = tbl.getJobProperties(); + if (jobProperties == null) { + return; + } + for (Map.Entry<String, String> entry : jobProperties.entrySet()) { + job.set(entry.getKey(), entry.getValue()); + } + } + private static final Object INPUT_SUMMARY_LOCK = new Object(); /** Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Thu Oct 30 16:22:33 2014 @@ -56,6 +56,8 @@ import org.apache.hadoop.hive.ql.exec.Pa import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; @@ -416,6 +418,13 @@ public class ExecDriver extends Task<Map Utilities.createTmpDirs(job, mWork); Utilities.createTmpDirs(job, rWork); + SessionState ss = SessionState.get(); + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") + && ss != null) { + TezSessionState session = ss.getTezSession(); + TezSessionPoolManager.getInstance().close(session, true); + } + // Finally SUBMIT the JOB! rj = jc.submitJob(job); // replace it back Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Thu Oct 30 16:22:33 2014 @@ -26,8 +26,11 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -72,9 +75,6 @@ public class ExecMapper extends MapReduc private static boolean done; // used to log memory usage periodically - public static MemoryMXBean memoryMXBean; - private long numRows = 0; - private long nextCntr = 1; private MapredLocalWork localWork = null; private boolean isLogInfoEnabled = false; @@ -84,8 +84,6 @@ public class ExecMapper extends MapReduc public void configure(JobConf job) { execContext = new ExecMapperContext(job); // Allocate the bean at the beginning - - memoryMXBean = ManagementFactory.getMemoryMXBean(); - l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); isLogInfoEnabled = l4j.isInfoEnabled(); @@ -176,15 +174,6 @@ public class ExecMapper extends MapReduc // Since there is no concept of a group, we don't invoke // startGroup/endGroup for a mapper mo.process((Writable)value); - if (isLogInfoEnabled) { - numRows++; - if (numRows == nextCntr) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processing " + numRows - + " rows: used memory = " + used_memory); - nextCntr = getNextCntr(numRows); - } - } } } catch (Throwable e) { abort = true; @@ -198,18 +187,6 @@ public class ExecMapper extends MapReduc } } - - private long getNextCntr(long cntr) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - - return 10 * cntr; - } - @Override public void close() { // No row was processed @@ -245,13 +222,7 @@ public class ExecMapper extends MapReduc } } - if (isLogInfoEnabled) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " - + used_memory); - } - - ReportStats rps = new ReportStats(rp); + ReportStats rps = new ReportStats(rp, jc); mo.preorderMap(rps); return; } catch (Exception e) { @@ -288,17 +259,21 @@ public class ExecMapper extends MapReduc */ public static class ReportStats implements Operator.OperatorFunc { private final Reporter rp; + private final Configuration conf; + private final String groupName; - public ReportStats(Reporter rp) { + public ReportStats(Reporter rp, Configuration conf) { this.rp = rp; + this.conf = conf; + this.groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); } @Override public void func(Operator op) { - Map<Enum<?>, Long> opStats = op.getStats(); - for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) { + Map<String, Long> opStats = op.getStats(); + for (Map.Entry<String, Long> e : opStats.entrySet()) { if (rp != null) { - rp.incrCounter(e.getKey(), e.getValue()); + rp.incrCounter(groupName, e.getKey(), e.getValue()); } } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Thu Oct 30 16:22:33 2014 @@ -70,8 +70,6 @@ public class ExecReducer extends MapRedu private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final String PLAN_KEY = "__REDUCE_PLAN__"; - // used to log memory usage periodically - private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); // Input value serde needs to be an array to support different SerDe // for different tags private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE]; @@ -86,8 +84,6 @@ public class ExecReducer extends MapRedu private Reporter rp; private boolean abort = false; private boolean isTagged = false; - private long cntr = 0; - private long nextCntr = 1; private TableDesc keyTableDesc; private TableDesc[] valueTableDesc; private ObjectInspector[] rowObjectInspector; @@ -103,8 +99,6 @@ public class ExecReducer extends MapRedu ObjectInspector keyObjectInspector; if (isInfoEnabled) { - LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - try { LOG.info("conf classpath = " + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); @@ -245,17 +239,7 @@ public class ExecReducer extends MapRedu row.clear(); row.add(keyObject); row.add(valueObject[tag]); - if (isInfoEnabled) { - cntr++; - if (cntr == nextCntr) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - if (isInfoEnabled) { - LOG.info("ExecReducer: processing " + cntr - + " rows: used memory = " + used_memory); - } - nextCntr = getNextCntr(cntr); - } - } + try { reducer.processOp(row, tag); } catch (Exception e) { @@ -283,17 +267,6 @@ public class ExecReducer extends MapRedu } } - private long getNextCntr(long cntr) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - - return 10 * cntr; - } - @Override public void close() { @@ -310,13 +283,9 @@ public class ExecReducer extends MapRedu } reducer.endGroup(); } - if (isInfoEnabled) { - LOG.info("ExecReducer: processed " + cntr + " rows: used memory = " - + memoryMXBean.getHeapMemoryUsage().getUsed()); - } reducer.close(abort); - ReportStats rps = new ReportStats(rp); + ReportStats rps = new ReportStats(rp, jc); reducer.preorderMap(rps); } catch (Exception e) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Thu Oct 30 16:22:33 2014 @@ -238,6 +238,18 @@ public class MapredLocalTask extends Tas variables.put(HADOOP_OPTS_KEY, hadoopOpts); } + //For Windows OS, we need to pass HIVE_HADOOP_CLASSPATH Java parameter while starting + //Hiveserver2 using "-hiveconf hive.hadoop.classpath=%HIVE_LIB%". This is to combine path(s). + if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)!= null) + { + if (variables.containsKey("HADOOP_CLASSPATH")) + { + variables.put("HADOOP_CLASSPATH", variables.get("HADOOP_CLASSPATH") + ";" + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)); + } else { + variables.put("HADOOP_CLASSPATH", HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)); + } + } + if(variables.containsKey(MapRedTask.HIVE_DEBUG_RECURSIVE)) { MapRedTask.configureDebugVariablesForChildJVM(variables); } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Thu Oct 30 16:22:33 2014 @@ -21,16 +21,23 @@ package org.apache.hadoop.hive.ql.exec.t import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; +import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.DataOutputBuffer; @@ -38,6 +45,7 @@ import org.apache.hadoop.io.serializer.S import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.split.TezGroupedSplit; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.EdgeProperty; @@ -67,17 +75,41 @@ import com.google.common.collect.Multima import com.google.protobuf.ByteString; /* - * Only works with old mapred API - * Will only work with a single MRInput for now. + * This is the central piece for Bucket Map Join and SMB join. It has the following + * responsibilities: + * 1. Group incoming splits based on bucketing. + * 2. Generate new serialized events for the grouped splits. + * 3. Create a routing table for the bucket map join and send a serialized version as payload + * for the EdgeManager. + * 4. For SMB join, generate a grouping according to bucketing for the "small" table side. */ public class CustomPartitionVertex extends VertexManagerPlugin { + public class PathComparatorForSplit implements Comparator<InputSplit> { + + @Override + public int compare(InputSplit inp1, InputSplit inp2) { + FileSplit fs1 = (FileSplit) inp1; + FileSplit fs2 = (FileSplit) inp2; + + int retval = fs1.getPath().compareTo(fs2.getPath()); + if (retval != 0) { + return retval; + } + + if (fs1.getStart() != fs2.getStart()) { + return (int) (fs1.getStart() - fs2.getStart()); + } + + return 0; + } + } + private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName()); VertexManagerPluginContext context; private InputConfigureVertexTasksEvent configureVertexTaskEvent; - private List<InputDataInformationEvent> dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; private final SplitGrouper grouper = new SplitGrouper(); @@ -89,6 +121,13 @@ public class CustomPartitionVertex exten private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap = new HashMap<String, Multimap<Integer, InputSplit>>(); + private int numInputsAffectingRootInputSpecUpdate = 1; + private int numInputsSeenSoFar = 0; + private final Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap(); + private final List<InputSplit> finalSplits = Lists.newLinkedList(); + private final Map<String, InputSpecUpdate> inputNameInputSpecMap = + new HashMap<String, InputSpecUpdate>(); + public CustomPartitionVertex(VertexManagerPluginContext context) { super(context); } @@ -108,12 +147,13 @@ public class CustomPartitionVertex exten this.numBuckets = vertexConf.getNumBuckets(); this.mainWorkName = vertexConf.getInputName(); this.vertexType = vertexConf.getVertexType(); + this.numInputsAffectingRootInputSpecUpdate = vertexConf.getNumInputs(); } @Override public void onVertexStarted(Map<String, List<Integer>> completions) { int numTasks = context.getVertexNumTasks(context.getVertexName()); - List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = + List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks); for (int i = 0; i < numTasks; ++i) { scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null)); @@ -133,8 +173,8 @@ public class CustomPartitionVertex exten @Override public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) { + numInputsSeenSoFar++; LOG.info("On root vertex initialized " + inputName); - try { // This is using the payload from the RootVertexInitializer corresponding // to InputName. Ideally it should be using it's own configuration class - @@ -168,20 +208,21 @@ public class CustomPartitionVertex exten } boolean dataInformationEventSeen = false; - Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, List<FileSplit>>(); + Map<String, Set<FileSplit>> pathFileSplitsMap = new TreeMap<String, Set<FileSplit>>(); for (Event event : events) { if (event instanceof InputConfigureVertexTasksEvent) { // No tasks should have been started yet. Checked by initial state // check. + LOG.info("Got a input configure vertex event for input: " + inputName); Preconditions.checkState(dataInformationEventSeen == false); InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event; // The vertex cannot be configured until all DataEvents are seen - to // build the routing table. configureVertexTaskEvent = cEvent; - dataInformationEvents = - Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks()); + LOG.info("Configure task for input name: " + inputName + " num tasks: " + + configureVertexTaskEvent.getNumTasks()); } if (event instanceof InputUpdatePayloadEvent) { // this event can never occur. If it does, fail. @@ -189,22 +230,26 @@ public class CustomPartitionVertex exten } else if (event instanceof InputDataInformationEvent) { dataInformationEventSeen = true; InputDataInformationEvent diEvent = (InputDataInformationEvent) event; - dataInformationEvents.add(diEvent); FileSplit fileSplit; try { fileSplit = getFileSplitFromEvent(diEvent); } catch (IOException e) { throw new RuntimeException("Failed to get file split for event: " + diEvent); } - List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath().getName()); + Set<FileSplit> fsList = + pathFileSplitsMap.get(Utilities.getBucketFileNameFromPathSubString(fileSplit.getPath() + .getName())); if (fsList == null) { - fsList = new ArrayList<FileSplit>(); - pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList); + fsList = new TreeSet<FileSplit>(new PathComparatorForSplit()); + pathFileSplitsMap.put( + Utilities.getBucketFileNameFromPathSubString(fileSplit.getPath().getName()), fsList); } fsList.add(fileSplit); } } + LOG.info("Path file splits map for input name: " + inputName + " is " + pathFileSplitsMap); + Multimap<Integer, InputSplit> bucketToInitialSplitMap = getBucketSplitMapForPath(pathFileSplitsMap); @@ -217,77 +262,144 @@ public class CustomPartitionVertex exten int availableSlots = totalResource / taskResource; - LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves + " waves."); + LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves + + " waves. Bucket initial splits map: " + bucketToInitialSplitMap); JobConf jobConf = new JobConf(conf); ShimLoader.getHadoopShims().getMergedCredentials(jobConf); Multimap<Integer, InputSplit> bucketToGroupedSplitMap = HashMultimap.<Integer, InputSplit> create(); - for (Integer key : bucketToInitialSplitMap.keySet()) { - InputSplit[] inputSplitArray = - (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); - Multimap<Integer, InputSplit> groupedSplit = - HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots, inputName); - bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); - } - - LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap); - if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) { + boolean secondLevelGroupingDone = false; + if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) { + for (Integer key : bucketToInitialSplitMap.keySet()) { + InputSplit[] inputSplitArray = + (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); + HiveSplitGenerator hiveSplitGenerator = new HiveSplitGenerator(); + Multimap<Integer, InputSplit> groupedSplit = + hiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, + availableSlots, inputName, mainWorkName.isEmpty()); + if (mainWorkName.isEmpty() == false) { + Multimap<Integer, InputSplit> singleBucketToGroupedSplit = + HashMultimap.<Integer, InputSplit> create(); + singleBucketToGroupedSplit.putAll(key, groupedSplit.values()); + groupedSplit = + grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots, + HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES)); + secondLevelGroupingDone = true; + } + bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); + } + processAllEvents(inputName, bucketToGroupedSplitMap, secondLevelGroupingDone); + } else { + // do not group across files in case of side work because there is only 1 KV reader per + // grouped split. This would affect SMB joins where we want to find the smallest key in + // all the bucket files. + for (Integer key : bucketToInitialSplitMap.keySet()) { + HiveSplitGenerator hiveSplitGenerator = new HiveSplitGenerator(); + InputSplit[] inputSplitArray = + (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); + Multimap<Integer, InputSplit> groupedSplit = + hiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, + availableSlots, inputName, false); + bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); + } /* - * this is the small table side. In case of SMB join, we may need to send each split to the + * this is the small table side. In case of SMB join, we need to send each split to the * corresponding bucket-based task on the other side. In case a split needs to go to * multiple downstream tasks, we need to clone the event and send it to the right * destination. */ - processAllSideEvents(inputName, bucketToGroupedSplitMap); - } else { - processAllEvents(inputName, bucketToGroupedSplitMap); + LOG.info("This is the side work - multi-mr work."); + processAllSideEventsSetParallelism(inputName, bucketToGroupedSplitMap); } } catch (Exception e) { throw new RuntimeException(e); } } - private void processAllSideEvents(String inputName, + private void processAllSideEventsSetParallelism(String inputName, Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException { // the bucket to task map should have been setup by the big table. + LOG.info("Processing events for input " + inputName); if (bucketToTaskMap.isEmpty()) { + LOG.info("We don't have a routing table yet. Will need to wait for the main input" + + " initialization"); inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap); return; } + processAllSideEvents(inputName, bucketToGroupedSplitMap); + setVertexParallelismAndRootInputSpec(inputNameInputSpecMap); + } + + private void processAllSideEvents(String inputName, + Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException { List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>(); + LOG.info("We have a routing table and we are going to set the destination tasks for the" + + " multi mr inputs. " + bucketToTaskMap); + + Integer[] numSplitsForTask = new Integer[taskCount]; + + Multimap<Integer, ByteBuffer> bucketToSerializedSplitMap = LinkedListMultimap.create(); + + // Create the list of serialized splits for each bucket. for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) { + for (InputSplit split : entry.getValue()) { + MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split); + ByteBuffer bs = serializedSplit.toByteString().asReadOnlyByteBuffer(); + bucketToSerializedSplitMap.put(entry.getKey(), bs); + } + } + + for (Entry<Integer, Collection<ByteBuffer>> entry : bucketToSerializedSplitMap.asMap().entrySet()) { Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey()); for (Integer task : destTasks) { - for (InputSplit split : entry.getValue()) { - MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split); + int count = 0; + for (ByteBuffer buf : entry.getValue()) { + count++; InputDataInformationEvent diEvent = - InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit - .toByteString().asReadOnlyByteBuffer()); + InputDataInformationEvent.createWithSerializedPayload(count, buf); diEvent.setTargetIndex(task); taskEvents.add(diEvent); } + numSplitsForTask[task] = count; } } + inputNameInputSpecMap.put(inputName, + InputSpecUpdate.createPerTaskInputSpecUpdate(Arrays.asList(numSplitsForTask))); + + LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size()); + context.addRootInputEvents(inputName, taskEvents); } private void processAllEvents(String inputName, - Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException { + Multimap<Integer, InputSplit> bucketToGroupedSplitMap, boolean secondLevelGroupingDone) + throws IOException { - List<InputSplit> finalSplits = Lists.newLinkedList(); + int totalInputsCount = 0; + List<Integer> numSplitsForTask = new ArrayList<Integer>(); for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) { int bucketNum = entry.getKey(); Collection<InputSplit> initialSplits = entry.getValue(); finalSplits.addAll(initialSplits); - for (int i = 0; i < initialSplits.size(); i++) { + for (InputSplit inputSplit : initialSplits) { bucketToTaskMap.put(bucketNum, taskCount); + if (secondLevelGroupingDone) { + TezGroupedSplit groupedSplit = (TezGroupedSplit) inputSplit; + numSplitsForTask.add(groupedSplit.getGroupedSplits().size()); + totalInputsCount += groupedSplit.getGroupedSplits().size(); + } else { + numSplitsForTask.add(1); + totalInputsCount += 1; + } taskCount++; } } + inputNameInputSpecMap.put(inputName, + InputSpecUpdate.createPerTaskInputSpecUpdate(numSplitsForTask)); + // Construct the EdgeManager descriptor to be used by all edges which need // the routing table. EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null; @@ -297,7 +409,6 @@ public class CustomPartitionVertex exten UserPayload payload = getBytePayload(bucketToTaskMap); hiveEdgeManagerDesc.setUserPayload(payload); } - Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap(); // Replace the edge manager for all vertices which have routing type custom. for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) { @@ -308,42 +419,67 @@ public class CustomPartitionVertex exten } } - LOG.info("Task count is " + taskCount); + LOG.info("Task count is " + taskCount + " for input name: " + inputName); - List<InputDataInformationEvent> taskEvents = - Lists.newArrayListWithCapacity(finalSplits.size()); + List<InputDataInformationEvent> taskEvents = Lists.newArrayListWithCapacity(totalInputsCount); // Re-serialize the splits after grouping. int count = 0; for (InputSplit inputSplit : finalSplits) { - MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit); - InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload( - count, serializedSplit.toByteString().asReadOnlyByteBuffer()); - diEvent.setTargetIndex(count); + if (secondLevelGroupingDone) { + TezGroupedSplit tezGroupedSplit = (TezGroupedSplit)inputSplit; + for (InputSplit subSplit : tezGroupedSplit.getGroupedSplits()) { + if ((subSplit instanceof TezGroupedSplit) == false) { + throw new IOException("Unexpected split type found: " + + subSplit.getClass().getCanonicalName()); + } + MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(subSplit); + InputDataInformationEvent diEvent = + InputDataInformationEvent.createWithSerializedPayload(count, serializedSplit + .toByteString().asReadOnlyByteBuffer()); + diEvent.setTargetIndex(count); + taskEvents.add(diEvent); + } + } else { + MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit); + InputDataInformationEvent diEvent = + InputDataInformationEvent.createWithSerializedPayload(count, serializedSplit + .toByteString().asReadOnlyByteBuffer()); + diEvent.setTargetIndex(count); + taskEvents.add(diEvent); + } count++; - taskEvents.add(diEvent); - } - - // Replace the Edge Managers - Map<String, InputSpecUpdate> rootInputSpecUpdate = - new HashMap<String, InputSpecUpdate>(); - rootInputSpecUpdate.put( - inputName, - InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); - if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) { - context.setVertexParallelism( - taskCount, - VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits - .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); } // Set the actual events for the tasks. + LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size()); context.addRootInputEvents(inputName, taskEvents); if (inputToGroupedSplitMap.isEmpty() == false) { for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) { processAllSideEvents(entry.getKey(), entry.getValue()); } + setVertexParallelismAndRootInputSpec(inputNameInputSpecMap); inputToGroupedSplitMap.clear(); } + + // Only done when it is a bucket map join only no SMB. + if (numInputsAffectingRootInputSpecUpdate == 1) { + setVertexParallelismAndRootInputSpec(inputNameInputSpecMap); + } + } + + private void + setVertexParallelismAndRootInputSpec(Map<String, InputSpecUpdate> rootInputSpecUpdate) + throws IOException { + if (numInputsAffectingRootInputSpecUpdate != numInputsSeenSoFar) { + return; + } + + LOG.info("Setting vertex parallelism since we have seen all inputs."); + + context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper + .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap, + rootInputSpecUpdate); + finalSplits.clear(); } UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException { @@ -377,14 +513,14 @@ public class CustomPartitionVertex exten * This method generates the map of bucket to file splits. */ private Multimap<Integer, InputSplit> getBucketSplitMapForPath( - Map<String, List<FileSplit>> pathFileSplitsMap) { + Map<String, Set<FileSplit>> pathFileSplitsMap) { int bucketNum = 0; Multimap<Integer, InputSplit> bucketToInitialSplitMap = ArrayListMultimap.<Integer, InputSplit> create(); - for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) { + for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) { int bucketId = bucketNum % numBuckets; for (FileSplit fsplit : entry.getValue()) { bucketToInitialSplitMap.put(bucketId, fsplit); @@ -392,6 +528,11 @@ public class CustomPartitionVertex exten bucketNum++; } + // this is just for SMB join use-case. The numBuckets would be equal to that of the big table + // and the small table could have lesser number of buckets. In this case, we want to send the + // data from the right buckets to the big table side. For e.g. Big table has 8 buckets and small + // table has 4 buckets, bucket 0 of small table needs to be sent to bucket 4 of the big table as + // well. if (bucketNum < numBuckets) { int loopedBucketId = 0; for (; bucketNum < numBuckets; bucketNum++) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java Thu Oct 30 16:22:33 2014 @@ -29,20 +29,31 @@ import org.apache.hadoop.io.Writable; * This class is the payload for custom vertex. It serializes and de-serializes * @numBuckets: the number of buckets of the "big table" * @vertexType: this is the type of vertex and differentiates between bucket map join and SMB joins - * @inputName: This is the name of the input. Used in case of SMB joins + * @numInputs: The number of inputs that are directly connected to the vertex (MRInput/MultiMRInput). + * In case of bucket map join, it is always 1. + * @inputName: This is the name of the input. Used in case of SMB joins. Empty in case of BucketMapJoin */ public class CustomVertexConfiguration implements Writable { private int numBuckets; private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES; + private int numInputs; private String inputName; public CustomVertexConfiguration() { } - public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) { + // this is the constructor to use for the Bucket map join case. + public CustomVertexConfiguration(int numBuckets, VertexType vertexType) { + this(numBuckets, vertexType, "", 1); + } + + // this is the constructor to use for SMB. + public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName, + int numInputs) { this.numBuckets = numBuckets; this.vertexType = vertexType; + this.numInputs = numInputs; this.inputName = inputName; } @@ -50,6 +61,7 @@ public class CustomVertexConfiguration i public void write(DataOutput out) throws IOException { out.writeInt(this.vertexType.ordinal()); out.writeInt(this.numBuckets); + out.writeInt(numInputs); out.writeUTF(inputName); } @@ -57,6 +69,7 @@ public class CustomVertexConfiguration i public void readFields(DataInput in) throws IOException { this.vertexType = VertexType.values()[in.readInt()]; this.numBuckets = in.readInt(); + this.numInputs = in.readInt(); this.inputName = in.readUTF(); } @@ -71,4 +84,8 @@ public class CustomVertexConfiguration i public String getInputName() { return inputName; } + + public int getNumInputs() { + return numInputs; + } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Thu Oct 30 16:22:33 2014 @@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; - import javax.security.auth.login.LoginException; import java.io.FileNotFoundException; import java.io.IOException; @@ -49,7 +47,7 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; @@ -110,16 +108,13 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; -import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; -import org.apache.tez.mapreduce.protos.MRRuntimeProtos; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization; @@ -178,6 +173,8 @@ public class DagUtils { private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork mapWork) { JobConf conf = new JobConf(baseConf); + conf.set(Operator.CONTEXT_NAME_KEY, mapWork.getName()); + if (mapWork.getNumMapTasks() != null) { // Is this required ? conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue()); @@ -268,8 +265,7 @@ public class DagUtils { case CUSTOM_EDGE: { mergeInputClass = ConcatenatedMergedKeyValueInput.class; int numBuckets = edgeProp.getNumBuckets(); - CustomVertexConfiguration vertexConf = - new CustomVertexConfiguration(numBuckets, vertexType, ""); + CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); VertexManagerPluginDescriptor desc = @@ -314,8 +310,7 @@ public class DagUtils { switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); - CustomVertexConfiguration vertexConf = - new CustomVertexConfiguration(numBuckets, vertexType, ""); + CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( @@ -340,7 +335,6 @@ public class DagUtils { /* * Helper function to create an edge property from an edge type. */ - @SuppressWarnings("rawtypes") private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) throws IOException { MRHelpers.translateMRConfToTez(conf); @@ -431,8 +425,9 @@ public class DagUtils { int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); - int cpus = conf.getInt(MRJobConfig.MAP_CPU_VCORES, - MRJobConfig.DEFAULT_MAP_CPU_VCORES); + int cpus = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) > 0 ? + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) : + conf.getInt(MRJobConfig.MAP_CPU_VCORES, MRJobConfig.DEFAULT_MAP_CPU_VCORES); return Resource.newInstance(memory, cpus); } @@ -452,17 +447,29 @@ public class DagUtils { */ private String getContainerJavaOpts(Configuration conf) { String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS); - if (javaOpts != null && !javaOpts.isEmpty()) { - String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL); - List<String> logProps = Lists.newArrayList(); - TezUtils.addLog4jSystemProperties(logLevel, logProps); - StringBuilder sb = new StringBuilder(); - for (String str : logProps) { - sb.append(str).append(" "); + + String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL); + List<String> logProps = Lists.newArrayList(); + TezUtils.addLog4jSystemProperties(logLevel, logProps); + StringBuilder sb = new StringBuilder(); + for (String str : logProps) { + sb.append(str).append(" "); + } + logLevel = sb.toString(); + + if (HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0) { + if (javaOpts != null) { + return javaOpts + " " + logLevel; + } else { + return logLevel; + } + } else { + if (javaOpts != null && !javaOpts.isEmpty()) { + LOG.warn(HiveConf.ConfVars.HIVETEZJAVAOPTS + " will be ignored because " + + HiveConf.ConfVars.HIVETEZCONTAINERSIZE + " is not set!"); } - return javaOpts + " " + sb.toString(); + return logLevel + " " + MRHelpers.getJavaOptsForMRMapper(conf); } - return MRHelpers.getJavaOptsForMRMapper(conf); } private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr, @@ -473,13 +480,9 @@ public class DagUtils { if (mergeJoinWork.getMainWork() instanceof MapWork) { List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList(); MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork()); - CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator(); Vertex mergeVx = createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType); - // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat - // here would cause pre-mature grouping which would be incorrect. - Class inputFormatClass = HiveInputFormat.class; conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); // mapreduce.tez.input.initializer.serialize.event.payload should be set // to false when using this plug-in to avoid getting a serialized event at run-time. @@ -496,9 +499,11 @@ public class DagUtils { VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); + // the +1 to the size is because of the main work. CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf() - .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias()); + .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias(), + mapWorkList.size() + 1); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); byte[] userPayload = dob.getData(); @@ -538,6 +543,7 @@ public class DagUtils { DataSourceDescriptor dataSource; int numTasks = -1; + @SuppressWarnings("rawtypes") Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); @@ -595,7 +601,13 @@ public class DagUtils { .setCustomInitializerDescriptor(descriptor).build(); } else { // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits - dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build(); + if (vertexHasCustomInput) { + dataSource = + MultiMRInput.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build(); + } else { + dataSource = + MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build(); + } } } else { // Setup client side split generation. @@ -640,6 +652,8 @@ public class DagUtils { private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) { JobConf conf = new JobConf(baseConf); + conf.set(Operator.CONTEXT_NAME_KEY, reduceWork.getName()); + // Is this required ? conf.set("mapred.reducer.class", ExecReducer.class.getName()); @@ -745,6 +759,7 @@ public class DagUtils { * @throws LoginException if we are unable to figure user information * @throws IOException when any dfs operation fails. */ + @SuppressWarnings("deprecation") public Path getDefaultDestDir(Configuration conf) throws LoginException, IOException { UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); String userName = ShimLoader.getHadoopShims().getShortUserName(ugi); @@ -857,6 +872,7 @@ public class DagUtils { return fstatus; } + @SuppressWarnings("deprecation") public static FileStatus validateTargetDir(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); FileStatus fstatus = null; @@ -971,7 +987,7 @@ public class DagUtils { public JobConf createConfiguration(HiveConf hiveConf) throws IOException { hiveConf.setBoolean("mapred.mapper.new-api", false); - JobConf conf = new JobConf(hiveConf); + JobConf conf = new JobConf(new TezConfiguration(hiveConf)); conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName()); @@ -1033,6 +1049,7 @@ public class DagUtils { * @param ctx This query's context * @return Vertex */ + @SuppressWarnings("deprecation") public Vertex createVertex(JobConf conf, BaseWork work, Path scratchDir, LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren, Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Thu Oct 30 16:22:33 2014 @@ -87,6 +87,8 @@ public class DynamicPartitionPruner { private final Object endOfEvents = new Object(); + private int totalEventCount = 0; + public DynamicPartitionPruner() { } @@ -114,7 +116,7 @@ public class DynamicPartitionPruner { // synchronous event processing loop. Won't return until all events have // been processed. this.processEvents(); - this.prunePartitions(work); + this.prunePartitions(work, context); LOG.info("Ok to proceed."); } @@ -163,12 +165,22 @@ public class DynamicPartitionPruner { } } - private void prunePartitions(MapWork work) throws HiveException { + private void prunePartitions(MapWork work, InputInitializerContext context) throws HiveException { + int expectedEvents = 0; for (String source : this.sourceInfoMap.keySet()) { for (SourceInfo si : this.sourceInfoMap.get(source)) { + int taskNum = context.getVertexNumTasks(source); + LOG.info("Expecting " + taskNum + " events for vertex " + source); + expectedEvents += taskNum; prunePartitionSingleSource(source, si, work); } } + + // sanity check. all tasks must submit events for us to succeed. + if (expectedEvents != totalEventCount) { + LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount); + throw new HiveException("Incorrect event count in dynamic parition pruning"); + } } private void prunePartitionSingleSource(String source, SourceInfo si, MapWork work) @@ -396,7 +408,8 @@ public class DynamicPartitionPruner { public void addEvent(InputInitializerEvent event) { synchronized(sourcesWaitingForEvents) { if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) { - queue.offer(event); + ++totalEventCount; + queue.offer(event); } } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Thu Oct 30 16:22:33 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,7 +66,6 @@ import com.google.common.collect.Multima * making sure that splits from different partitions are only grouped if they * are of the same schema, format and serde */ -@SuppressWarnings("deprecation") public class HiveSplitGenerator extends InputInitializer { private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class); @@ -72,11 +73,17 @@ public class HiveSplitGenerator extends private static final SplitGrouper grouper = new SplitGrouper(); private final DynamicPartitionPruner pruner = new DynamicPartitionPruner(); private InputInitializerContext context; + private static Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache = + new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>(); public HiveSplitGenerator(InputInitializerContext initializerContext) { super(initializerContext); } + public HiveSplitGenerator() { + this(null); + } + @Override public List<Event> initialize() throws Exception { InputInitializerContext rootInputContext = getContext(); @@ -150,58 +157,28 @@ public class HiveSplitGenerator extends } - public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, + public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits, float waves, int availableSlots) throws Exception { - return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null); + return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true); } - public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, - Configuration conf, InputSplit[] splits, float waves, int availableSlots, - String inputName) throws Exception { - - MapWork work = null; - if (inputName != null) { - work = (MapWork) Utilities.getMergeWork(jobConf, inputName); - // work can still be null if there is no merge work for this input - } - if (work == null) { - work = Utilities.getMapWork(jobConf); - } + public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, + Configuration conf, InputSplit[] splits, float waves, int availableSlots, String inputName, + boolean groupAcrossFiles) throws Exception { + MapWork work = populateMapWork(jobConf, inputName); Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); - Class<?> previousInputFormatClass = null; - String previousDeserializerClass = null; - Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache = - new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>(); - int i = 0; - + InputSplit prevSplit = null; for (InputSplit s : splits) { // this is the bit where we make sure we don't group across partition // schema boundaries - - Path path = ((FileSplit) s).getPath(); - - PartitionDesc pd = - HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(), - path, cache); - - String currentDeserializerClass = pd.getDeserializerClassName(); - Class<?> currentInputFormatClass = pd.getInputFileFormatClass(); - - if ((currentInputFormatClass != previousInputFormatClass) - || (!currentDeserializerClass.equals(previousDeserializerClass))) { + if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) { ++i; - } - - previousInputFormatClass = currentInputFormatClass; - previousDeserializerClass = currentDeserializerClass; - - if (LOG.isDebugEnabled()) { - LOG.debug("Adding split " + path + " to src group " + i); + prevSplit = s; } bucketSplitMultiMap.put(i, s); } @@ -214,6 +191,54 @@ public class HiveSplitGenerator extends return groupedSplits; } + private MapWork populateMapWork(JobConf jobConf, String inputName) { + MapWork work = null; + if (inputName != null) { + work = (MapWork) Utilities.getMergeWork(jobConf, inputName); + // work can still be null if there is no merge work for this input + } + if (work == null) { + work = Utilities.getMapWork(jobConf); + } + + return work; + } + + public boolean schemaEvolved(InputSplit s, InputSplit prevSplit, boolean groupAcrossFiles, + MapWork work) throws IOException { + boolean retval = false; + Path path = ((FileSplit) s).getPath(); + PartitionDesc pd = + HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(), + path, cache); + String currentDeserializerClass = pd.getDeserializerClassName(); + Class<?> currentInputFormatClass = pd.getInputFileFormatClass(); + + Class<?> previousInputFormatClass = null; + String previousDeserializerClass = null; + if (prevSplit != null) { + Path prevPath = ((FileSplit) prevSplit).getPath(); + if (!groupAcrossFiles) { + return !path.equals(prevPath); + } + PartitionDesc prevPD = + HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(), + prevPath, cache); + previousDeserializerClass = prevPD.getDeserializerClassName(); + previousInputFormatClass = prevPD.getInputFileFormatClass(); + } + + if ((currentInputFormatClass != previousInputFormatClass) + || (!currentDeserializerClass.equals(previousDeserializerClass))) { + retval = true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding split " + path + " to src new group? " + retval); + } + return retval; + } + private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) { List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Thu Oct 30 16:22:33 2014 @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,15 +44,15 @@ import org.apache.hadoop.hive.ql.exec.mr import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; -import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.mapred.JobConf; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.input.MultiMRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; -import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; @@ -73,6 +73,7 @@ public class MapRecordProcessor extends private int position = 0; private boolean foundCachedMergeWork = false; MRInputLegacy legacyMRInput = null; + MultiMRInput mainWorkMultiMRInput = null; private ExecMapperContext execContext = null; private boolean abort = false; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; @@ -129,12 +130,14 @@ public class MapRecordProcessor extends perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, processorContext, mrReporter, inputs, outputs); - //Update JobConf using MRInput, info like filename comes via this + // Update JobConf using MRInput, info like filename comes via this legacyMRInput = getMRInput(inputs); - Configuration updatedConf = legacyMRInput.getConfigUpdates(); - if (updatedConf != null) { - for (Entry<String, String> entry : updatedConf) { - jconf.set(entry.getKey(), entry.getValue()); + if (legacyMRInput != null) { + Configuration updatedConf = legacyMRInput.getConfigUpdates(); + if (updatedConf != null) { + for (Entry<String, String> entry : updatedConf) { + jconf.set(entry.getKey(), entry.getValue()); + } } } @@ -158,8 +161,6 @@ public class MapRecordProcessor extends if (mergeWorkList != null) { MapOperator mergeMapOp = null; for (MapWork mergeMapWork : mergeWorkList) { - processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs - .get(mergeMapWork.getName())))); if (mergeMapWork.getVectorMode()) { mergeMapOp = new VectorMapOperator(); } else { @@ -235,11 +236,17 @@ public class MapRecordProcessor extends } private void initializeMapRecordSources() throws Exception { + int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself sources = new MapRecordSource[size]; - KeyValueReader reader = legacyMRInput.getReader(); position = mapOp.getConf().getTag(); sources[position] = new MapRecordSource(); + KeyValueReader reader = null; + if (mainWorkMultiMRInput != null) { + reader = getKeyValueReader(mainWorkMultiMRInput.getKeyValueReaders(), mapOp); + } else { + reader = legacyMRInput.getReader(); + } sources[position].init(jconf, mapOp, reader); for (MapOperator mapOp : mergeMapOpList) { int tag = mapOp.getConf().getTag(); @@ -248,13 +255,28 @@ public class MapRecordProcessor extends MultiMRInput multiMRInput = multiMRInputMap.get(inputName); Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders(); l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName); - List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders); - reader = new KeyValueInputMerger(kvReaderList); + reader = getKeyValueReader(kvReaders, mapOp); sources[tag].init(jconf, mapOp, reader); } ((TezContext) MapredContext.get()).setRecordSources(sources); } + @SuppressWarnings("deprecation") + private KeyValueReader getKeyValueReader(Collection<KeyValueReader> keyValueReaders, + MapOperator mapOp) + throws Exception { + List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(keyValueReaders); + // this sets up the map operator contexts correctly + mapOp.initializeContexts(); + Deserializer deserializer = mapOp.getCurrentDeserializer(); + KeyValueReader reader = + new KeyValueInputMerger(kvReaderList, deserializer, + new ObjectInspector[] { deserializer.getObjectInspector() }, mapOp + .getConf() + .getSortCols()); + return reader; + } + private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) { for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) { if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) { @@ -269,11 +291,7 @@ public class MapRecordProcessor extends @Override void run() throws Exception { - while (sources[position].pushRecord()) { - if (isLogInfoEnabled) { - logProgress(); - } - } + while (sources[position].pushRecord()) {} } @Override @@ -305,10 +323,7 @@ public class MapRecordProcessor extends } } - if (isLogInfoEnabled) { - logCloseInfo(); - } - ReportStats rps = new ReportStats(reporter); + ReportStats rps = new ReportStats(reporter, jconf); mapOp.preorderMap(rps); return; } catch (Exception e) { @@ -342,7 +357,17 @@ public class MapRecordProcessor extends multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue()); } } - theMRInput.init(); + if (theMRInput != null) { + theMRInput.init(); + } else { + String alias = mapWork.getAliasToWork().keySet().iterator().next(); + if (inputs.get(alias) instanceof MultiMRInput) { + mainWorkMultiMRInput = (MultiMRInput) inputs.get(alias); + } else { + throw new IOException("Unexpected input type found: " + + inputs.get(alias).getClass().getCanonicalName()); + } + } return theMRInput; } } Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java?rev=1635536&r1=1635535&r2=1635536&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java Thu Oct 30 16:22:33 2014 @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.MapOperator; @@ -28,7 +27,6 @@ import org.apache.hadoop.hive.ql.metadat import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; -import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.runtime.library.api.KeyValueReader; /** @@ -45,7 +43,7 @@ public class MapRecordSource implements private final boolean grouped = false; void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException { - execContext = new ExecMapperContext(jconf); + execContext = mapOp.getExecContext(); this.mapOp = mapOp; this.reader = reader; }