HIVE-15337 Enhance Show Compactions output with JobId and start time for attempted state (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/252dd7e7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/252dd7e7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/252dd7e7 Branch: refs/heads/master Commit: 252dd7e776c115bd96bc2673cfcb653ffad8d27a Parents: e22e411 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue Dec 6 16:24:03 2016 -0800 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue Dec 6 16:24:03 2016 -0800 ---------------------------------------------------------------------- .../metastore/txn/CompactionTxnHandler.java | 40 ++++++++++++++++++-- .../hadoop/hive/metastore/txn/TxnHandler.java | 12 ++++-- .../hadoop/hive/metastore/txn/TxnStore.java | 7 ++++ .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 ++++++-- .../hive/ql/plan/ShowCompactionsDesc.java | 2 +- .../hive/ql/txn/compactor/CompactorMR.java | 13 ++++--- .../hadoop/hive/ql/txn/compactor/Worker.java | 4 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 1 + .../queries/clientpositive/dbtxnmgr_showlocks.q | 6 +++ .../clientpositive/dbtxnmgr_showlocks.q.out | 20 ++++++++++ 10 files changed, 102 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index fde1b54..545244b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -860,7 +860,8 @@ class CompactionTxnHandler extends TxnHandler { //compactions are not happening. ci.state = ATTEMPTED_STATE; //this is not strictly accurate, but 'type' cannot be null. - ci.type = CompactionType.MINOR; + if(ci.type == null) { ci.type = CompactionType.MINOR; } + ci.start = getDbTime(dbConn); } else { ci.state = FAILED_STATE; @@ -874,7 +875,7 @@ class CompactionTxnHandler extends TxnHandler { closeStmt(pStmt); dbConn.commit(); } catch (SQLException e) { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.warn("markFailed(" + ci.id + "):" + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); try { @@ -883,7 +884,7 @@ class CompactionTxnHandler extends TxnHandler { catch(MetaException ex) { LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); } - LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); + LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e); } finally { close(rs, stmt, null); close(null, pStmt, dbConn); @@ -892,7 +893,38 @@ class CompactionTxnHandler extends TxnHandler { markFailed(ci); } } - + @Override + public void setHadoopJobId(String hadoopJobId, long id) { + try { + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id; + LOG.debug("Going to execute <" + s + ">"); + int updateCount = stmt.executeUpdate(s); + LOG.debug("Going to commit"); + closeStmt(stmt); + dbConn.commit(); + } catch (SQLException e) { + LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + try { + checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); + } + catch(MetaException ex) { + LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); + } + LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e); + } finally { + close(null, stmt, dbConn); + } + } catch (RetryException e) { + setHadoopJobId(hadoopJobId, id); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index b0fa836..0c495ef 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1509,6 +1509,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + + //-1 because 'null' literal doesn't work for all DBs... "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " + "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " + "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS"; @@ -1531,14 +1532,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { //do nothing to handle RU/D if we add another status } e.setWorkerid(rs.getString(6)); - e.setStart(rs.getLong(7)); + long start = rs.getLong(7); + if(!rs.wasNull()) { + e.setStart(start); + } long endTime = rs.getLong(8); if(endTime != -1) { e.setEndTime(endTime); } e.setRunAs(rs.getString(9)); e.setHadoopJobId(rs.getString(10)); - long id = rs.getLong(11);//for debugging + e.setId(rs.getLong(11)); response.addToCompacts(e); } LOG.debug("Going to rollback"); @@ -1943,12 +1947,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } sendRetrySignal = true; } else { - LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e)); + LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e)); } } else { //make sure we know we saw an error that we don't recognize - LOG.info("Non-retryable error: " + getMessage(e)); + LOG.info("Non-retryable error in " + caller + " : " + getMessage(e)); } } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 3c06517..170280e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -387,4 +387,11 @@ public interface TxnStore { public void releaseLocks(); } } + + /** + * Once a {@link java.util.concurrent.ThreadPoolExecutor.Worker} submits a job to the cluster, + * it calls this to update the metadata. + * @param id {@link CompactionInfo#id} + */ + public void setHadoopJobId(String hadoopJobId, long id); } http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 4b39eb9..493e1b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2740,6 +2740,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { ShowCompactResponse rsp = db.showCompactions(); // Write the results into the file + final String noVal = " --- "; + DataOutputStream os = getOutputStream(desc.getResFile()); try { // Write a header @@ -2756,6 +2758,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable { os.writeBytes("Worker"); os.write(separator); os.writeBytes("Start Time"); + os.write(separator); + os.writeBytes("Duration(ms)"); + os.write(separator); + os.writeBytes("HadoopJobId"); os.write(terminator); if (rsp.getCompacts() != null) { @@ -2765,16 +2771,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable { os.writeBytes(e.getTablename()); os.write(separator); String part = e.getPartitionname(); - os.writeBytes(part == null ? "NULL" : part); + os.writeBytes(part == null ? noVal : part); os.write(separator); os.writeBytes(e.getType().toString()); os.write(separator); os.writeBytes(e.getState()); os.write(separator); String wid = e.getWorkerid(); - os.writeBytes(wid == null ? "NULL" : wid); + os.writeBytes(wid == null ? noVal : wid); + os.write(separator); + os.writeBytes(e.isSetStart() ? Long.toString(e.getStart()) : noVal); + os.write(separator); + os.writeBytes(e.isSetEndTime() ? Long.toString(e.getEndTime() - e.getStart()) : noVal); os.write(separator); - os.writeBytes(Long.toString(e.getStart())); + os.writeBytes(e.isSetHadoopJobId() ? e.getHadoopJobId() : noVal); os.write(terminator); } } http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java index 94fd289..dc47a38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ShowCompactionsDesc.java @@ -28,7 +28,7 @@ public class ShowCompactionsDesc extends DDLDesc implements Serializable { private static final long serialVersionUID = 1L; private static final String schema = "dbname,tabname,partname,type,state,workerid," + - "starttime#string:string:string:string:string:string:string"; + "starttime,duration,hadoopjobid#string:string:string:string:string:string:string:string:string"; private String resFile; http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 9ac2964..2f25925 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; @@ -198,7 +199,7 @@ public class CompactorMR { * @throws java.io.IOException if the job fails */ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, - ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException { + ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); @@ -232,7 +233,7 @@ public class CompactorMR { launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), - maxDeltastoHandle, -1, conf); + maxDeltastoHandle, -1, conf, txnHandler, ci.id); } //now recompute state since we've done minor compactions and have different 'best' set of deltas dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); @@ -270,14 +271,15 @@ public class CompactorMR { } launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), - dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf); + dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, txnHandler, ci.id); su.gatherStats(); } private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType, StringableList dirsToSearch, List<AcidUtils.ParsedDelta> parsedDeltas, - int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf) throws IOException { + int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf, + TxnStore txnHandler, long id) throws IOException { job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); if(dirsToSearch == null) { dirsToSearch = new StringableList(); @@ -308,7 +310,8 @@ public class CompactorMR { "(current delta dirs count=" + curDirNumber + ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]"); RunningJob rj = JobClient.runJob(job); - LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID()); + LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id); + txnHandler.setHadoopJobId(rj.getID().toString(), id); rj.waitForCompletion(); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 666f13b..2d6cce9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -163,14 +163,14 @@ public class Worker extends CompactorThread { launchedJob = true; try { if (runJobAsSelf(runAs)) { - mr.run(conf, jobName.toString(), t, sd, txns, ci, su); + mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler); } else { UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(), UserGroupInformation.getLoginUser()); ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - mr.run(conf, jobName.toString(), t, sd, txns, ci, su); + mr.run(conf, jobName.toString(), t, sd, txns, ci, su, txnHandler); return null; } }); http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 49ba667..9145cf3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -309,6 +309,7 @@ public class TestTxnCommands2 { ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); // 3. Perform a delete. runStatementOnDriver("delete from " + Table.NONACIDORCTBL + " where a = 1"); http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q index da8e448..24a42ea 100644 --- a/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q +++ b/ql/src/test/queries/clientpositive/dbtxnmgr_showlocks.q @@ -21,4 +21,10 @@ show locks partitioned_acid_table partition (p='abc'); show locks partitioned_acid_table partition (p='abc') extended; +insert into partitioned_acid_table partition(p='abc') values(1,2); + +alter table partitioned_acid_table partition(p='abc') compact 'minor'; + +show compactions; + drop table partitioned_acid_table; http://git-wip-us.apache.org/repos/asf/hive/blob/252dd7e7/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out index 3b683f8..4da2c87 100644 --- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out +++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out @@ -51,6 +51,26 @@ PREHOOK: type: SHOWLOCKS POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended POSTHOOK: type: SHOWLOCKS Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info +PREHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@partitioned_acid_table@p=abc +POSTHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@partitioned_acid_table@p=abc +POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: partitioned_acid_table PARTITION(p=abc).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor' +PREHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: query: alter table partitioned_acid_table partition(p='abc') compact 'minor' +POSTHOOK: type: ALTERTABLE_COMPACT +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +Database Table Partition Type State Worker Start Time Duration(ms) HadoopJobId +default partitioned_acid_table p=abc MINOR initiated --- --- --- --- PREHOOK: query: drop table partitioned_acid_table PREHOOK: type: DROPTABLE PREHOOK: Input: default@partitioned_acid_table