Repository: hive
Updated Branches:
  refs/heads/branch-1 abaf88248 -> 6e0504d9a


http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index f8798b7..0601a29 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -169,7 +169,7 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
-          "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE 
+ "'";
+          "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" 
+ INITIATED_STATE + "'";
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         if (!rs.next()) {
@@ -185,6 +185,7 @@ class CompactionTxnHandler extends TxnHandler {
           info.tableName = rs.getString(3);
           info.partName = rs.getString(4);
           info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
+          info.properties = rs.getString(6);
           // Now, update this record as being worked on by this worker.
           long now = getDbTime(dbConn);
           s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', 
" +
@@ -329,7 +330,7 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, 
CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, 
CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE 
CQ_ID = " + info.id);
+        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, 
CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, 
CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from 
COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
         if(rs.next()) {
           info = CompactionInfo.loadFullFromCompactionQueue(rs);
         }
@@ -345,7 +346,7 @@ class CompactionTxnHandler extends TxnHandler {
           LOG.debug("Going to rollback");
           dbConn.rollback();
         }
-        pStmt = dbConn.prepareStatement("insert into 
COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, 
CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, 
CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
+        pStmt = dbConn.prepareStatement("insert into 
COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, 
CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, 
CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, 
?,?,?,?)");
         info.state = SUCCEEDED_STATE;
         CompactionInfo.insertIntoCompletedCompactions(pStmt, info, 
getDbTime(dbConn));
         updCount = pStmt.executeUpdate();
@@ -838,7 +839,7 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, 
CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, 
CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE 
CQ_ID = " + ci.id);
+        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, 
CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, 
CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from 
COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
         if(rs.next()) {
           ci = CompactionInfo.loadFullFromCompactionQueue(rs);
           String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
@@ -866,7 +867,7 @@ class CompactionTxnHandler extends TxnHandler {
         }
         close(rs, stmt, null);
 
-        pStmt = dbConn.prepareStatement("insert into 
COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, 
CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, 
CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
+        pStmt = dbConn.prepareStatement("insert into 
COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, 
CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, 
CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, 
?,?,?,?)");
         CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, 
getDbTime(dbConn));
         int updCount = pStmt.executeUpdate();
         LOG.debug("Going to commit");

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 5805966..b503652 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -123,6 +123,7 @@ public final class TxnDbUtil {
           " CQ_PARTITION varchar(767)," +
           " CQ_STATE char(1) NOT NULL," +
           " CQ_TYPE char(1) NOT NULL," +
+          " CQ_TBLPROPERTIES varchar(2048)," +
           " CQ_WORKER_ID varchar(128)," +
           " CQ_START bigint," +
           " CQ_RUN_AS varchar(128)," +
@@ -140,6 +141,7 @@ public final class TxnDbUtil {
         " CC_PARTITION varchar(767)," +
         " CC_STATE char(1) NOT NULL," +
         " CC_TYPE char(1) NOT NULL," +
+        " CC_TBLPROPERTIES varchar(2048)," +
         " CC_WORKER_ID varchar(128)," +
         " CC_START bigint," +
         " CC_END bigint," +

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 27fa820..f2658f2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HouseKeeperService;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.StringUtils;
 
@@ -1385,6 +1386,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         String partName = rqst.getPartitionname();
         if (partName != null) buf.append("cq_partition, ");
         buf.append("cq_state, cq_type");
+        if (rqst.getProperties() != null) {
+          buf.append(", cq_tblproperties");
+        }
         if (rqst.getRunas() != null) buf.append(", cq_run_as");
         buf.append(") values (");
         buf.append(id);
@@ -1413,6 +1417,10 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
             dbConn.rollback();
             throw new MetaException("Unexpected compaction type " + 
rqst.getType().toString());
         }
+        if (rqst.getProperties() != null) {
+          buf.append("', '");
+          buf.append(new StringableMap(rqst.getProperties()).toString());
+        }
         if (rqst.getRunas() != null) {
           buf.append("', '");
           buf.append(rqst.getRunas());

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 5391fb0..7212bfd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -30,8 +30,10 @@ import 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 public class TxnUtils {
@@ -208,4 +210,56 @@ public class TxnUtils {
     long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
     return sizeInBytes / 1024 > queryMemoryLimit;
   }
+
+  public static class StringableMap extends HashMap<String, String> {
+
+    public StringableMap(String s) {
+      String[] parts = s.split(":", 2);
+      // read that many chars
+      int numElements = Integer.parseInt(parts[0]);
+      s = parts[1];
+      for (int i = 0; i < numElements; i++) {
+        parts = s.split(":", 2);
+        int len = Integer.parseInt(parts[0]);
+        String key = null;
+        if (len > 0) key = parts[1].substring(0, len);
+        parts = parts[1].substring(len).split(":", 2);
+        len = Integer.parseInt(parts[0]);
+        String value = null;
+        if (len > 0) value = parts[1].substring(0, len);
+        s = parts[1].substring(len);
+        put(key, value);
+      }
+    }
+
+    public StringableMap(Map<String, String> m) {
+      super(m);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append(size());
+      buf.append(':');
+      if (size() > 0) {
+        for (Map.Entry<String, String> entry : entrySet()) {
+          int length = (entry.getKey() == null) ? 0 : entry.getKey().length();
+          buf.append(entry.getKey() == null ? 0 : length);
+          buf.append(':');
+          if (length > 0) buf.append(entry.getKey());
+          length = (entry.getValue() == null) ? 0 : entry.getValue().length();
+          buf.append(length);
+          buf.append(':');
+          if (length > 0) buf.append(entry.getValue());
+        }
+      }
+      return buf.toString();
+    }
+
+    public Properties toProperties() {
+      Properties props = new Properties();
+      props.putAll(this);
+      return props;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index be4753f..c7d9de7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -1738,7 +1738,7 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       }
       partName = partitions.get(0).getName();
     }
-    db.compact(tbl.getDbName(), tbl.getTableName(), partName, 
desc.getCompactionType());
+    db.compact(tbl.getDbName(), tbl.getTableName(), partName, 
desc.getCompactionType(), desc.getProps());
     console.printInfo("Compaction enqueued.");
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index a67f23a..6362d23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3240,16 +3240,18 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param partName name of the partition, if null table will be compacted 
(valid only for
    *                 non-partitioned tables).
    * @param compactType major or minor
+   * @param tblproperties the list of tblproperties to overwrite for this 
compaction
    * @throws HiveException
    */
-  public void compact(String dbname, String tableName, String partName,  
String compactType)
+  public void compact(String dbname, String tableName, String partName, String 
compactType,
+                      Map<String, String> tblproperties)
       throws HiveException {
     try {
       CompactionType cr = null;
       if ("major".equals(compactType)) cr = CompactionType.MAJOR;
       else if ("minor".equals(compactType)) cr = CompactionType.MINOR;
       else throw new RuntimeException("Unknown compaction type " + 
compactType);
-      getMSC().compact(dbname, tableName, partName, cr);
+      getMSC().compact(dbname, tableName, partName, cr, tblproperties);
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
       throw new HiveException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index f7cd167..c836268 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1727,6 +1727,11 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     AlterTableSimpleDesc desc = new AlterTableSimpleDesc(
         tableName, newPartSpec, type);
 
+    if (ast.getChildCount() > 1) {
+      HashMap<String, String> mapProp = getProps((ASTNode) 
(ast.getChild(1)).getChild(0));
+      desc.setProps(mapProp);
+    }
+
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), 
desc), conf));
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 6c3d42a..eda460f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -1344,8 +1344,8 @@ alterStatementSuffixBucketNum
 alterStatementSuffixCompact
 @init { msgs.push("compaction request"); }
 @after { msgs.pop(); }
-    : KW_COMPACT compactType=StringLiteral
-    -> ^(TOK_ALTERTABLE_COMPACT $compactType)
+    : KW_COMPACT compactType=StringLiteral (KW_WITH KW_OVERWRITE 
KW_TBLPROPERTIES tableProperties)?
+    -> ^(TOK_ALTERTABLE_COMPACT $compactType tableProperties?)
     ;
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java
index d819d15..2ae70bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableSimpleDesc.java
@@ -33,6 +33,7 @@ public class AlterTableSimpleDesc extends DDLDesc {
   private String compactionType;
 
   AlterTableTypes type;
+  private Map<String, String> props;
 
   public AlterTableSimpleDesc() {
   }
@@ -99,4 +100,11 @@ public class AlterTableSimpleDesc extends DDLDesc {
     return compactionType;
   }
 
+  public Map<String, String> getProps() {
+    return props;
+  }
+
+  public void setProps(Map<String, String> props) {
+    this.props = props;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 9f68fa6..03cd992 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -40,7 +41,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.NullWritable;
@@ -92,12 +92,16 @@ public class CompactorMR {
   static final private String DELTA_DIRS = "hive.compactor.delta.dirs";
   static final private String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search";
   static final private String TMPDIR = "_tmp";
+  static final private String TBLPROPS_PREFIX = "tblprops.";
+  static final private String COMPACTOR_PREFIX = "compactor.";
+
+  private JobConf mrJob;  // the MR job for compaction
 
   public CompactorMR() {
   }
 
   private JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, 
StorageDescriptor sd,
-                                    ValidTxnList txns) {
+                                    ValidTxnList txns, CompactionInfo ci) {
     JobConf job = new JobConf(conf);
     job.setJobName(jobName);
     job.setOutputKeyClass(NullWritable.class);
@@ -123,9 +127,52 @@ public class CompactorMR {
     job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
     job.setInt(NUM_BUCKETS, sd.getNumBuckets());
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    overrideMRProps(job, t.getParameters()); // override MR properties from 
tblproperties if applicable
+    if (ci.properties != null) { // override MR properties and general 
tblproperties if applicable
+      overrideTblProps(job, t.getParameters(), ci.properties);
+    }
     setColumnTypes(job, sd.getCols());
     return job;
   }
+
+  /**
+   * Parse tblproperties specified on "ALTER TABLE ... COMPACT ... WITH 
OVERWRITE TBLPROPERTIES ..."
+   * and override two categories of properties:
+   * 1. properties of the compactor MR job (with prefix "compactor.")
+   * 2. general hive properties (with prefix "tblprops.")
+   * @param job the compactor MR job
+   * @param tblproperties existing tblproperties
+   * @param properties table properties
+   */
+  private void overrideTblProps(JobConf job, Map<String, String> 
tblproperties, String properties) {
+    StringableMap stringableMap = new StringableMap(properties);
+    overrideMRProps(job, stringableMap);
+    // mingle existing tblproperties with those specified on the ALTER TABLE 
command
+    for (String key : stringableMap.keySet()) {
+      if (key.startsWith(TBLPROPS_PREFIX)) {
+        String propKey = key.substring(9);  // 9 is the length of "tblprops.". 
We only keep the rest
+        tblproperties.put(propKey, stringableMap.get(key));
+      }
+    }
+    // re-set TABLE_PROPS with reloaded tblproperties
+    job.set(TABLE_PROPS, new StringableMap(tblproperties).toString());
+  }
+
+  /**
+   * Parse tblproperties to override relevant properties of compactor MR job 
with specified values.
+   * For example, compactor.mapreuce.map.memory.mb=1024
+   * @param job the compactor MR job
+   * @param properties table properties
+   */
+  private void overrideMRProps(JobConf job, Map<String, String> properties) {
+    for (String key : properties.keySet()) {
+      if (key.startsWith(COMPACTOR_PREFIX)) {
+        String mrKey = key.substring(10); // 10 is the length of "compactor." 
We only keep the rest.
+        job.set(mrKey, properties.get(key));
+      }
+    }
+  }
+
   /**
    * Run Compaction which may consist of several jobs on the cluster.
    * @param conf Hive configuration file
@@ -142,7 +189,7 @@ public class CompactorMR {
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && 
conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new 
RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
     }
-    JobConf job = createBaseJobConf(conf, jobName, t, sd, txns);
+    JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci);
 
     // Figure out and encode what files we need to read.  We do this here 
(rather than in
     // getSplits below) because as part of this we discover our minimum and 
maximum transactions,
@@ -167,11 +214,11 @@ public class CompactorMR {
         "runaway/mis-configured process writing to ACID tables, especially 
using Streaming Ingest API.");
       int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle;
       for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) {
-        JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + 
jobSubId, t, sd, txns);
+        JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + 
jobSubId, t, sd, txns, ci);
         launchCompactionJob(jobMinorCompact,
           null, CompactionType.MINOR, null,
           parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * 
maxDeltastoHandle),
-          maxDeltastoHandle, -1);
+          maxDeltastoHandle, -1, conf);
       }
       //now recompute state since we've done minor compactions and have 
different 'best' set of deltas
       dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
@@ -209,14 +256,14 @@ public class CompactorMR {
     }
 
     launchCompactionJob(job, baseDir, ci.type, dirsToSearch, 
dir.getCurrentDirectories(),
-      dir.getCurrentDirectories().size(), dir.getObsolete().size());
+      dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf);
 
     su.gatherStats();
   }
   private void launchCompactionJob(JobConf job, Path baseDir, CompactionType 
compactionType,
                                    StringableList dirsToSearch,
                                    List<AcidUtils.ParsedDelta> parsedDeltas,
-                                   int curDirNumber, int obsoleteDirNumber) 
throws IOException {
+                                   int curDirNumber, int obsoleteDirNumber, 
HiveConf hiveConf) throws IOException {
     job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
     if(dirsToSearch == null) {
       dirsToSearch = new StringableList();
@@ -238,6 +285,10 @@ public class CompactorMR {
     job.setLong(MIN_TXN, minTxn);
     job.setLong(MAX_TXN, maxTxn);
 
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+      mrJob = job;
+    }
+
     LOG.info("Submitting " + compactionType + " compaction job '" +
       job.getJobName() + "' to " + job.getQueueName() + " queue.  " +
       "(current delta dirs count=" + curDirNumber +
@@ -272,6 +323,10 @@ public class CompactorMR {
     HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, 
HiveInputFormat.class.getName());
   }
 
+  public JobConf getMrJob() {
+    return mrJob;
+  }
+
   static class CompactorInputSplit implements InputSplit {
     private long length = 0;
     private List<String> locations;
@@ -621,58 +676,6 @@ public class CompactorMR {
 
   }
 
-  static class StringableMap extends HashMap<String, String> {
-
-    StringableMap(String s) {
-      String[] parts = s.split(":", 2);
-      // read that many chars
-      int numElements = Integer.valueOf(parts[0]);
-      s = parts[1];
-      for (int i = 0; i < numElements; i++) {
-        parts = s.split(":", 2);
-        int len = Integer.valueOf(parts[0]);
-        String key = null;
-        if (len > 0) key = parts[1].substring(0, len);
-        parts = parts[1].substring(len).split(":", 2);
-        len = Integer.valueOf(parts[0]);
-        String value = null;
-        if (len > 0) value = parts[1].substring(0, len);
-        s = parts[1].substring(len);
-        put(key, value);
-      }
-    }
-
-    StringableMap(Map<String, String> m) {
-      super(m);
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder buf = new StringBuilder();
-      buf.append(size());
-      buf.append(':');
-      if (size() > 0) {
-        for (Map.Entry<String, String> entry : entrySet()) {
-          int length = (entry.getKey() == null) ? 0 : entry.getKey().length();
-          buf.append(entry.getKey() == null ? 0 : length);
-          buf.append(':');
-          if (length > 0) buf.append(entry.getKey());
-          length = (entry.getValue() == null) ? 0 : entry.getValue().length();
-          buf.append(length);
-          buf.append(':');
-          if (length > 0) buf.append(entry.getValue());
-        }
-      }
-      return buf.toString();
-    }
-
-    public Properties toProperties() {
-      Properties props = new Properties();
-      props.putAll(this);
-      return props;
-    }
-  }
-
   static class StringableList extends ArrayList<Path> {
     StringableList() {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 625e389..1a63f99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.util.StringUtils;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -58,6 +59,8 @@ public class Initiator extends CompactorThread {
   static final private String CLASS_NAME = Initiator.class.getName();
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
+  static final private String COMPACTORTHRESHOLD_PREFIX = 
"compactorthreshold.";
+
   private long checkInterval;
 
   @Override
@@ -144,7 +147,7 @@ public class Initiator extends CompactorThread {
               /*Future thought: checkForCompaction will check a lot of file 
metadata and may be expensive.
               * Long term we should consider having a thread pool here and 
running checkForCompactionS
               * in parallel*/
-              CompactionType compactionNeeded = checkForCompaction(ci, txns, 
sd, runAs);
+              CompactionType compactionNeeded = checkForCompaction(ci, txns, 
sd, t.getParameters(), runAs);
               if (compactionNeeded != null) requestCompaction(ci, runAs, 
compactionNeeded);
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we 
should compact " +
@@ -213,6 +216,7 @@ public class Initiator extends CompactorThread {
   private CompactionType checkForCompaction(final CompactionInfo ci,
                                             final ValidTxnList txns,
                                             final StorageDescriptor sd,
+                                            final Map<String, String> 
tblproperties,
                                             final String runAs)
       throws IOException, InterruptedException {
     // If it's marked as too many aborted, we already know we need to compact
@@ -222,7 +226,7 @@ public class Initiator extends CompactorThread {
       return CompactionType.MAJOR;
     }
     if (runJobAsSelf(runAs)) {
-      return determineCompactionType(ci, txns, sd);
+      return determineCompactionType(ci, txns, sd, tblproperties);
     } else {
       LOG.info("Going to initiate as user " + runAs);
       UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
@@ -230,7 +234,7 @@ public class Initiator extends CompactorThread {
       CompactionType compactionType = ugi.doAs(new 
PrivilegedExceptionAction<CompactionType>() {
         @Override
         public CompactionType run() throws Exception {
-          return determineCompactionType(ci, txns, sd);
+          return determineCompactionType(ci, txns, sd, tblproperties);
         }
       });
       try {
@@ -244,7 +248,7 @@ public class Initiator extends CompactorThread {
   }
 
   private CompactionType determineCompactionType(CompactionInfo ci, 
ValidTxnList txns,
-                                                 StorageDescriptor sd)
+                                                 StorageDescriptor sd, 
Map<String, String> tblproperties)
       throws IOException, InterruptedException {
     boolean noBase = false;
     Path location = new Path(sd.getLocation());
@@ -282,8 +286,11 @@ public class Initiator extends CompactorThread {
     if (baseSize == 0 && deltaSize > 0) {
       noBase = true;
     } else {
-      float deltaPctThreshold = HiveConf.getFloatVar(conf,
+      String deltaPctProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
           HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      float deltaPctThreshold = deltaPctProp == null ?
+          HiveConf.getFloatVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) :
+          Float.parseFloat(deltaPctProp);
       boolean bigEnough =   (float)deltaSize/(float)baseSize > 
deltaPctThreshold;
       if (LOG.isDebugEnabled()) {
         StringBuilder msg = new StringBuilder("delta size: ");
@@ -299,8 +306,11 @@ public class Initiator extends CompactorThread {
       if (bigEnough) return CompactionType.MAJOR;
     }
 
-    int deltaNumThreshold = HiveConf.getIntVar(conf,
+    String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
         HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    int deltaNumThreshold = deltaNumProp == null ?
+        HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
+        Integer.parseInt(deltaNumProp);
     boolean enough = deltas.size() > deltaNumThreshold;
     if (enough) {
       LOG.debug("Found " + deltas.size() + " delta files, threshold is " + 
deltaNumThreshold +

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index bf8e5cc..256e27b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public class Worker extends CompactorThread {
   static final private int baseThreadNum = 10002;
 
   private String name;
+  private JobConf mrJob; // the MR job for compaction
 
   /**
    * Get the hostname that this worker is run on.  Made static and public so 
that other classes
@@ -182,6 +184,9 @@ public class Worker extends CompactorThread {
             }
           }
           txnHandler.markCompacted(ci);
+          if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+            mrJob = mr.getMrJob();
+          }
         } catch (Exception e) {
           LOG.error("Caught exception while trying to compact " + ci +
               ".  Marking failed to avoid repeated failures, " + 
StringUtils.stringifyException(e));
@@ -215,6 +220,10 @@ public class Worker extends CompactorThread {
     setName(name.toString());
   }
 
+  public JobConf getMrJob() {
+    return mrJob;
+  }
+
   static final class StatsUpdater {
     static final private Log LOG = LogFactory.getLog(StatsUpdater.class);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6e0504d9/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 381eeb3..b91bdc3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils.StringableMap;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -68,19 +69,19 @@ public class TestWorker extends CompactorTest {
   @Test
   public void stringableMap() throws Exception {
     // Empty map case
-    CompactorMR.StringableMap m = new CompactorMR.StringableMap(new 
HashMap<String, String>());
+    StringableMap m = new StringableMap(new HashMap<String, String>());
     String s = m.toString();
     Assert.assertEquals("0:", s);
-    m = new CompactorMR.StringableMap(s);
+    m = new StringableMap(s);
     Assert.assertEquals(0, m.size());
 
     Map<String, String> base = new HashMap<String, String>();
     base.put("mary", "poppins");
     base.put("bert", null);
     base.put(null, "banks");
-    m = new CompactorMR.StringableMap(base);
+    m = new StringableMap(base);
     s = m.toString();
-    m = new CompactorMR.StringableMap(s);
+    m = new StringableMap(s);
     Assert.assertEquals(3, m.size());
     Map<String, Boolean> saw = new HashMap<String, Boolean>(3);
     saw.put("mary", false);

Reply via email to