Repository: hive Updated Branches: refs/heads/branch-1 abaf88248 -> 6e0504d9a
http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index f8798b7..0601a29 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -169,7 +169,7 @@ class CompactionTxnHandler extends TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -185,6 +185,7 @@ class CompactionTxnHandler extends TxnHandler { info.tableName = rs.getString(3); info.partName = rs.getString(4); info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); + info.properties = rs.getString(6); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + @@ -329,7 +330,7 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); if(rs.next()) { info = CompactionInfo.loadFullFromCompactionQueue(rs); } @@ -345,7 +346,7 @@ class CompactionTxnHandler extends TxnHandler { LOG.debug("Going to rollback"); dbConn.rollback(); } - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); info.state = SUCCEEDED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); updCount = pStmt.executeUpdate(); @@ -838,7 +839,7 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); + rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id; @@ -866,7 +867,7 @@ class CompactionTxnHandler extends TxnHandler { } close(rs, stmt, null); - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)"); + pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); int updCount = pStmt.executeUpdate(); LOG.debug("Going to commit"); http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 5805966..b503652 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -123,6 +123,7 @@ public final class TxnDbUtil { " CQ_PARTITION varchar(767)," + " CQ_STATE char(1) NOT NULL," + " CQ_TYPE char(1) NOT NULL," + + " CQ_TBLPROPERTIES varchar(2048)," + " CQ_WORKER_ID varchar(128)," + " CQ_START bigint," + " CQ_RUN_AS varchar(128)," + @@ -140,6 +141,7 @@ public final class TxnDbUtil { " CC_PARTITION varchar(767)," + " CC_STATE char(1) NOT NULL," + " CC_TYPE char(1) NOT NULL," + + " CC_TBLPROPERTIES varchar(2048)," + " CC_WORKER_ID varchar(128)," + " CC_START bigint," + " CC_END bigint," + http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 27fa820..f2658f2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HouseKeeperService; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; @@ -1385,6 +1386,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { String partName = rqst.getPartitionname(); if (partName != null) buf.append("cq_partition, "); buf.append("cq_state, cq_type"); + if (rqst.getProperties() != null) { + buf.append(", cq_tblproperties"); + } if (rqst.getRunas() != null) buf.append(", cq_run_as"); buf.append(") values ("); buf.append(id); @@ -1413,6 +1417,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn.rollback(); throw new MetaException("Unexpected compaction type " + rqst.getType().toString()); } + if (rqst.getProperties() != null) { + buf.append("', '"); + buf.append(new StringableMap(rqst.getProperties()).toString()); + } if (rqst.getRunas() != null) { buf.append("', '"); buf.append(rqst.getRunas()); http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 5391fb0..7212bfd 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -30,8 +30,10 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; public class TxnUtils { @@ -208,4 +210,56 @@ public class TxnUtils { long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8); return sizeInBytes / 1024 > queryMemoryLimit; } + + public static class StringableMap extends HashMap<String, String> { + + public StringableMap(String s) { + String[] parts = s.split(":", 2); + // read that many chars + int numElements = Integer.parseInt(parts[0]); + s = parts[1]; + for (int i = 0; i < numElements; i++) { + parts = s.split(":", 2); + int len = Integer.parseInt(parts[0]); + String key = null; + if (len > 0) key = parts[1].substring(0, len); + parts = parts[1].substring(len).split(":", 2); + len = Integer.parseInt(parts[0]); + String value = null; + if (len > 0) value = parts[1].substring(0, len); + s = parts[1].substring(len); + put(key, value); + } + } + + public StringableMap(Map<String, String> m) { + super(m); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(size()); + buf.append(':'); + if (size() > 0) { + for (Map.Entry<String, String> entry : entrySet()) { + int length = (entry.getKey() == null) ? 0 : entry.getKey().length(); + buf.append(entry.getKey() == null ? 0 : length); + buf.append(':'); + if (length > 0) buf.append(entry.getKey()); + length = (entry.getValue() == null) ? 0 : entry.getValue().length(); + buf.append(length); + buf.append(':'); + if (length > 0) buf.append(entry.getValue()); + } + } + return buf.toString(); + } + + public Properties toProperties() { + Properties props = new Properties(); + props.putAll(this); + return props; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index be4753f..c7d9de7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1738,7 +1738,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } partName = partitions.get(0).getName(); } - db.compact(tbl.getDbName(), tbl.getTableName(), partName, desc.getCompactionType()); + db.compact(tbl.getDbName(), tbl.getTableName(), partName, desc.getCompactionType(), desc.getProps()); console.printInfo("Compaction enqueued."); return 0; } http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index a67f23a..6362d23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3240,16 +3240,18 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param partName name of the partition, if null table will be compacted (valid only for * non-partitioned tables). * @param compactType major or minor + * @param tblproperties the list of tblproperties to overwrite for this compaction * @throws HiveException */ - public void compact(String dbname, String tableName, String partName, String compactType) + public void compact(String dbname, String tableName, String partName, String compactType, + Map<String, String> tblproperties) throws HiveException { try { CompactionType cr = null; if ("major".equals(compactType)) cr = CompactionType.MAJOR; else if ("minor".equals(compactType)) cr = CompactionType.MINOR; else throw new RuntimeException("Unknown compaction type " + compactType); - getMSC().compact(dbname, tableName, partName, cr); + getMSC().compact(dbname, tableName, partName, cr, tblproperties); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index f7cd167..c836268 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1727,6 +1727,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { AlterTableSimpleDesc desc = new AlterTableSimpleDesc( tableName, newPartSpec, type); + if (ast.getChildCount() > 1) { + HashMap<String, String> mapProp = getProps((ASTNode) (ast.getChild(1)).getChild(0)); + desc.setProps(mapProp); + } + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc), conf)); } http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 6c3d42a..eda460f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -1344,8 +1344,8 @@ alterStatementSuffixBucketNum alterStatementSuffixCompact @init { msgs.push("compaction request"); } @after { msgs.pop(); } - : KW_COMPACT compactType=StringLiteral - -> ^(TOK_ALTERTABLE_COMPACT $compactType) + : KW_COMPACT compactType=StringLiteral (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)? + -> ^(TOK_ALTERTABLE_COMPACT $compactType tableProperties?) ; http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java index d819d15..2ae70bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java @@ -33,6 +33,7 @@ public class AlterTableSimpleDesc extends DDLDesc { private String compactionType; AlterTableTypes type; + private Map<String, String> props; public AlterTableSimpleDesc() { } @@ -99,4 +100,11 @@ public class AlterTableSimpleDesc extends DDLDesc { return compactionType; } + public Map<String, String> getProps() { + return props; + } + + public void setProps(Map<String, String> props) { + this.props = props; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 9f68fa6..03cd992 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -40,7 +41,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; @@ -92,12 +92,16 @@ public class CompactorMR { static final private String DELTA_DIRS = "hive.compactor.delta.dirs"; static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search"; static final private String TMPDIR = "_tmp"; + static final private String TBLPROPS_PREFIX = "tblprops."; + static final private String COMPACTOR_PREFIX = "compactor."; + + private JobConf mrJob; // the MR job for compaction public CompactorMR() { } private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd, - ValidTxnList txns) { + ValidTxnList txns, CompactionInfo ci) { JobConf job = new JobConf(conf); job.setJobName(jobName); job.setOutputKeyClass(NullWritable.class); @@ -123,9 +127,52 @@ public class CompactorMR { job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString()); job.setInt(NUM_BUCKETS, sd.getNumBuckets()); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); + overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable + if (ci.properties != null) { // override MR properties and general tblproperties if applicable + overrideTblProps(job, t.getParameters(), ci.properties); + } setColumnTypes(job, sd.getCols()); return job; } + + /** + * Parse tblproperties specified on "ALTER TABLE ... COMPACT ... WITH OVERWRITE TBLPROPERTIES ..." + * and override two categories of properties: + * 1. properties of the compactor MR job (with prefix "compactor.") + * 2. general hive properties (with prefix "tblprops.") + * @param job the compactor MR job + * @param tblproperties existing tblproperties + * @param properties table properties + */ + private void overrideTblProps(JobConf job, Map<String, String> tblproperties, String properties) { + StringableMap stringableMap = new StringableMap(properties); + overrideMRProps(job, stringableMap); + // mingle existing tblproperties with those specified on the ALTER TABLE command + for (String key : stringableMap.keySet()) { + if (key.startsWith(TBLPROPS_PREFIX)) { + String propKey = key.substring(9); // 9 is the length of "tblprops.". We only keep the rest + tblproperties.put(propKey, stringableMap.get(key)); + } + } + // re-set TABLE_PROPS with reloaded tblproperties + job.set(TABLE_PROPS, new StringableMap(tblproperties).toString()); + } + + /** + * Parse tblproperties to override relevant properties of compactor MR job with specified values. + * For example, compactor.mapreuce.map.memory.mb=1024 + * @param job the compactor MR job + * @param properties table properties + */ + private void overrideMRProps(JobConf job, Map<String, String> properties) { + for (String key : properties.keySet()) { + if (key.startsWith(COMPACTOR_PREFIX)) { + String mrKey = key.substring(10); // 10 is the length of "compactor." We only keep the rest. + job.set(mrKey, properties.get(key)); + } + } + } + /** * Run Compaction which may consist of several jobs on the cluster. * @param conf Hive configuration file @@ -142,7 +189,7 @@ public class CompactorMR { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); } - JobConf job = createBaseJobConf(conf, jobName, t, sd, txns); + JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci); // Figure out and encode what files we need to read. We do this here (rather than in // getSplits below) because as part of this we discover our minimum and maximum transactions, @@ -167,11 +214,11 @@ public class CompactorMR { "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API."); int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle; for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) { - JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns); + JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, txns, ci); launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), - maxDeltastoHandle, -1); + maxDeltastoHandle, -1, conf); } //now recompute state since we've done minor compactions and have different 'best' set of deltas dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); @@ -209,14 +256,14 @@ public class CompactorMR { } launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), - dir.getCurrentDirectories().size(), dir.getObsolete().size()); + dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf); su.gatherStats(); } private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType, StringableList dirsToSearch, List<AcidUtils.ParsedDelta> parsedDeltas, - int curDirNumber, int obsoleteDirNumber) throws IOException { + int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf) throws IOException { job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); if(dirsToSearch == null) { dirsToSearch = new StringableList(); @@ -238,6 +285,10 @@ public class CompactorMR { job.setLong(MIN_TXN, minTxn); job.setLong(MAX_TXN, maxTxn); + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + mrJob = job; + } + LOG.info("Submitting " + compactionType + " compaction job '" + job.getJobName() + "' to " + job.getQueueName() + " queue. " + "(current delta dirs count=" + curDirNumber + @@ -272,6 +323,10 @@ public class CompactorMR { HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); } + public JobConf getMrJob() { + return mrJob; + } + static class CompactorInputSplit implements InputSplit { private long length = 0; private List<String> locations; @@ -621,58 +676,6 @@ public class CompactorMR { } - static class StringableMap extends HashMap<String, String> { - - StringableMap(String s) { - String[] parts = s.split(":", 2); - // read that many chars - int numElements = Integer.valueOf(parts[0]); - s = parts[1]; - for (int i = 0; i < numElements; i++) { - parts = s.split(":", 2); - int len = Integer.valueOf(parts[0]); - String key = null; - if (len > 0) key = parts[1].substring(0, len); - parts = parts[1].substring(len).split(":", 2); - len = Integer.valueOf(parts[0]); - String value = null; - if (len > 0) value = parts[1].substring(0, len); - s = parts[1].substring(len); - put(key, value); - } - } - - StringableMap(Map<String, String> m) { - super(m); - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append(size()); - buf.append(':'); - if (size() > 0) { - for (Map.Entry<String, String> entry : entrySet()) { - int length = (entry.getKey() == null) ? 0 : entry.getKey().length(); - buf.append(entry.getKey() == null ? 0 : length); - buf.append(':'); - if (length > 0) buf.append(entry.getKey()); - length = (entry.getValue() == null) ? 0 : entry.getValue().length(); - buf.append(length); - buf.append(':'); - if (length > 0) buf.append(entry.getValue()); - } - } - return buf.toString(); - } - - public Properties toProperties() { - Properties props = new Properties(); - props.putAll(this); - return props; - } - } - static class StringableList extends ArrayList<Path> { StringableList() { http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 625e389..1a63f99 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -46,6 +46,7 @@ import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,6 +59,8 @@ public class Initiator extends CompactorThread { static final private String CLASS_NAME = Initiator.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); + static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; + private long checkInterval; @Override @@ -144,7 +147,7 @@ public class Initiator extends CompactorThread { /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive. * Long term we should consider having a thread pool here and running checkForCompactionS * in parallel*/ - CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs); + CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, t.getParameters(), runAs); if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded); } catch (Throwable t) { LOG.error("Caught exception while trying to determine if we should compact " + @@ -213,6 +216,7 @@ public class Initiator extends CompactorThread { private CompactionType checkForCompaction(final CompactionInfo ci, final ValidTxnList txns, final StorageDescriptor sd, + final Map<String, String> tblproperties, final String runAs) throws IOException, InterruptedException { // If it's marked as too many aborted, we already know we need to compact @@ -222,7 +226,7 @@ public class Initiator extends CompactorThread { return CompactionType.MAJOR; } if (runJobAsSelf(runAs)) { - return determineCompactionType(ci, txns, sd); + return determineCompactionType(ci, txns, sd, tblproperties); } else { LOG.info("Going to initiate as user " + runAs); UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, @@ -230,7 +234,7 @@ public class Initiator extends CompactorThread { CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() { @Override public CompactionType run() throws Exception { - return determineCompactionType(ci, txns, sd); + return determineCompactionType(ci, txns, sd, tblproperties); } }); try { @@ -244,7 +248,7 @@ public class Initiator extends CompactorThread { } private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns, - StorageDescriptor sd) + StorageDescriptor sd, Map<String, String> tblproperties) throws IOException, InterruptedException { boolean noBase = false; Path location = new Path(sd.getLocation()); @@ -282,8 +286,11 @@ public class Initiator extends CompactorThread { if (baseSize == 0 && deltaSize > 0) { noBase = true; } else { - float deltaPctThreshold = HiveConf.getFloatVar(conf, + String deltaPctProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD); + float deltaPctThreshold = deltaPctProp == null ? + HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : + Float.parseFloat(deltaPctProp); boolean bigEnough = (float)deltaSize/(float)baseSize > deltaPctThreshold; if (LOG.isDebugEnabled()) { StringBuilder msg = new StringBuilder("delta size: "); @@ -299,8 +306,11 @@ public class Initiator extends CompactorThread { if (bigEnough) return CompactionType.MAJOR; } - int deltaNumThreshold = HiveConf.getIntVar(conf, + String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); + int deltaNumThreshold = deltaNumProp == null ? + HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : + Integer.parseInt(deltaNumProp); boolean enough = deltas.size() > deltaNumThreshold; if (enough) { LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold + http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index bf8e5cc..256e27b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -58,6 +59,7 @@ public class Worker extends CompactorThread { static final private int baseThreadNum = 10002; private String name; + private JobConf mrJob; // the MR job for compaction /** * Get the hostname that this worker is run on. Made static and public so that other classes @@ -182,6 +184,9 @@ public class Worker extends CompactorThread { } } txnHandler.markCompacted(ci); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + mrJob = mr.getMrJob(); + } } catch (Exception e) { LOG.error("Caught exception while trying to compact " + ci + ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e)); @@ -215,6 +220,10 @@ public class Worker extends CompactorThread { setName(name.toString()); } + public JobConf getMrJob() { + return mrJob; + } + static final class StatsUpdater { static final private Log LOG = LogFactory.getLog(StatsUpdater.class); http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 381eeb3..b91bdc3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.junit.Assert; import org.junit.Before; @@ -68,19 +69,19 @@ public class TestWorker extends CompactorTest { @Test public void stringableMap() throws Exception { // Empty map case - CompactorMR.StringableMap m = new CompactorMR.StringableMap(new HashMap<String, String>()); + StringableMap m = new StringableMap(new HashMap<String, String>()); String s = m.toString(); Assert.assertEquals("0:", s); - m = new CompactorMR.StringableMap(s); + m = new StringableMap(s); Assert.assertEquals(0, m.size()); Map<String, String> base = new HashMap<String, String>(); base.put("mary", "poppins"); base.put("bert", null); base.put(null, "banks"); - m = new CompactorMR.StringableMap(base); + m = new StringableMap(base); s = m.toString(); - m = new CompactorMR.StringableMap(s); + m = new StringableMap(s); Assert.assertEquals(3, m.size()); Map<String, Boolean> saw = new HashMap<String, Boolean>(3); saw.put("mary", false);