Repository: hive Updated Branches: refs/heads/master 5b8ffe2d9 -> 2a2f64270
HIVE-17635: Add unit tests to CompactionTxnHandler and use PreparedStatements for queries (Andrew Sherman, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a2f6427 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a2f6427 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a2f6427 Branch: refs/heads/master Commit: 2a2f6427014045b9119714d205d7b8face9f7d92 Parents: 5b8ffe2 Author: Andrew Sherman <asher...@cloudera.com> Authored: Tue Oct 31 18:57:52 2017 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Tue Oct 31 18:57:52 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hive/beeline/HiveSchemaTool.java | 34 ++-- .../metastore/txn/TestCompactionTxnHandler.java | 63 +++++++ .../metastore/txn/CompactionTxnHandler.java | 168 +++++++++++++------ .../hadoop/hive/metastore/txn/TxnUtils.java | 52 +++++- .../hadoop/hive/metastore/txn/TestTxnUtils.java | 38 ++++- 5 files changed, 280 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java ---------------------------------------------------------------------- diff --git a/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java b/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java index 5350311..04576ae 100644 --- a/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java +++ b/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java @@ -667,27 +667,31 @@ public class HiveSchemaTool { for (String seqName : seqNameToTable.keySet()) { String tableName = seqNameToTable.get(seqName).getLeft(); String tableKey = seqNameToTable.get(seqName).getRight(); + String fullSequenceName = "org.apache.hadoop.hive.metastore.model." + seqName; String seqQuery = needsQuotedIdentifier ? - ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.\"SEQUENCE_NAME\" ") - : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.SEQUENCE_NAME "); + ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"=? order by t.\"SEQUENCE_NAME\" ") + : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME=? order by t.SEQUENCE_NAME "); String maxIdQuery = needsQuotedIdentifier ? ("select max(\"" + tableKey + "\") from \"" + tableName + "\"") : ("select max(" + tableKey + ") from " + tableName); - ResultSet res = stmt.executeQuery(maxIdQuery); - if (res.next()) { - long maxId = res.getLong(1); - if (maxId > 0) { - ResultSet resSeq = stmt.executeQuery(seqQuery); - if (!resSeq.next()) { - isValid = false; - System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE"); - } else if (resSeq.getLong(1) < maxId) { - isValid = false; - System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max("+ tableKey + ") in " + tableName); - } - } + ResultSet res = stmt.executeQuery(maxIdQuery); + if (res.next()) { + long maxId = res.getLong(1); + if (maxId > 0) { + PreparedStatement pStmt = conn.prepareStatement(seqQuery); + pStmt.setString(1, fullSequenceName); + ResultSet resSeq = pStmt.executeQuery(); + if (!resSeq.next()) { + isValid = false; + System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE"); + } else if (resSeq.getLong(1) < maxId) { + isValid = false; + System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max(" + + tableKey + ") in " + tableName); + } } + } } System.out.println((isValid ? "Succeeded" :"Failed") + " in sequence number validation for SEQUENCE_TABLE."); http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 96005b4..34a1600 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -49,6 +49,7 @@ import java.util.SortedSet; import java.util.TreeSet; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertNull; import static junit.framework.Assert.assertTrue; @@ -64,6 +65,10 @@ public class TestCompactionTxnHandler { public TestCompactionTxnHandler() throws Exception { TxnDbUtil.setConfValues(conf); + // Set config so that TxnUtils.buildQueryWithINClauseStrings() will + // produce multiple queries + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1); + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 10); tearDown(); } @@ -224,6 +229,64 @@ public class TestCompactionTxnHandler { } @Test + public void testMarkFailed() throws Exception { + CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + assertEquals(0, txnHandler.findReadyToClean().size()); + CompactionInfo ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + + assertEquals(0, txnHandler.findReadyToClean().size()); + txnHandler.markFailed(ci); + assertNull(txnHandler.findNextToCompact("fred")); + boolean failedCheck = txnHandler.checkFailedCompactions(ci); + assertFalse(failedCheck); + try { + // The first call to markFailed() should have removed the record from + // COMPACTION_QUEUE, so a repeated call should fail + txnHandler.markFailed(ci); + fail("The first call to markFailed() must have failed as this call did " + + "not throw the expected exception"); + } catch (IllegalStateException e) { + // This is expected + assertTrue(e.getMessage().contains("No record with CQ_ID=")); + } + + // There are not enough failed compactions yet so checkFailedCompactions() should return false. + // But note that any sql error will also result in a return of false. + assertFalse(txnHandler.checkFailedCompactions(ci)); + + // Add more failed compactions so that the total is exactly COMPACTOR_INITIATOR_FAILED_THRESHOLD + for (int i = 1 ; i < conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); i++) { + addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today"); + } + // Now checkFailedCompactions() will return true + assertTrue(txnHandler.checkFailedCompactions(ci)); + + // Now add enough failed compactions to ensure purgeCompactionHistory() will attempt delete; + // HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED is enough for this. + // But we also want enough to tickle the code in TxnUtils.buildQueryWithINClauseStrings() + // so that it produces multiple queries. For that we need at least 290. + for (int i = 0 ; i < 300; i++) { + addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today"); + } + txnHandler.purgeCompactionHistory(); + } + + private void addFailedCompaction(String dbName, String tableName, CompactionType type, + String partitionName) throws MetaException { + CompactionRequest rqst; + CompactionInfo ci; + rqst = new CompactionRequest(dbName, tableName, type); + rqst.setPartitionname(partitionName); + txnHandler.compact(rqst); + ci = txnHandler.findNextToCompact("fred"); + assertNotNull(ci); + txnHandler.markFailed(ci); + } + + @Test public void testRevokeFromLocalWorkers() throws Exception { CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); txnHandler.compact(rqst); http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 7f1b331..a90b7d4 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -334,13 +334,13 @@ class CompactionTxnHandler extends TxnHandler { public void markCleaned(CompactionInfo info) throws MetaException { try { Connection dbConn = null; - Statement stmt = null; PreparedStatement pStmt = null; ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt.setLong(1, info.id); + rs = pStmt.executeQuery(); if(rs.next()) { info = CompactionInfo.loadFullFromCompactionQueue(rs); } @@ -348,9 +348,11 @@ class CompactionTxnHandler extends TxnHandler { throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE"); } close(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + pStmt = dbConn.prepareStatement(s); + pStmt.setLong(1, info.id); LOG.debug("Going to execute update <" + s + ">"); - int updCount = stmt.executeUpdate(s); + int updCount = pStmt.executeUpdate(); if (updCount != 1) { LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount); LOG.debug("Going to rollback"); @@ -364,28 +366,55 @@ class CompactionTxnHandler extends TxnHandler { // Remove entries from completed_txn_components as well, so we don't start looking there // again but only up to the highest txn ID include in this compaction job. //highestTxnId will be NULL in upgrade scenarios - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; + s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + + "ctc_table = ?"; + if (info.partName != null) { + s += " and ctc_partition = ?"; + } + if(info.highestTxnId != 0) { + s += " and ctc_txnid <= ?"; + } + pStmt = dbConn.prepareStatement(s); + int paramCount = 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); if (info.partName != null) { - s += " and ctc_partition = '" + info.partName + "'"; + pStmt.setString(paramCount++, info.partName); } if(info.highestTxnId != 0) { - s += " and ctc_txnid <= " + info.highestTxnId; + pStmt.setLong(paramCount++, info.highestTxnId); } LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) < 1) { + if (pStmt.executeUpdate() < 1) { LOG.error("Expected to remove at least one row from completed_txn_components when " + "marking compaction entry as clean!"); } s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId); - if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; + TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; + if (info.highestTxnId != 0) s += " and txn_id <= ?"; + if (info.partName != null) s += " and tc_partition = ?"; + + pStmt = dbConn.prepareStatement(s); + paramCount = 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if(info.highestTxnId != 0) { + pStmt.setLong(paramCount++, info.highestTxnId); + } + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + LOG.debug("Going to execute update <" + s + ">"); - rs = stmt.executeQuery(s); + rs = pStmt.executeQuery(); List<Long> txnids = new ArrayList<>(); - while (rs.next()) txnids.add(rs.getLong(1)); + List<String> questions = new ArrayList<>(); + while (rs.next()) { + long id = rs.getLong(1); + txnids.add(id); + questions.add("?"); + } // Remove entries from txn_components, as there may be aborted txn components if (txnids.size() > 0) { List<String> queries = new ArrayList<>(); @@ -397,21 +426,34 @@ class CompactionTxnHandler extends TxnHandler { prefix.append("delete from TXN_COMPONENTS where "); //because 1 txn may include different partitions/tables even in auto commit mode - suffix.append(" and tc_database = "); - suffix.append(quoteString(info.dbname)); - suffix.append(" and tc_table = "); - suffix.append(quoteString(info.tableName)); + suffix.append(" and tc_database = ?"); + suffix.append(" and tc_table = ?"); if (info.partName != null) { - suffix.append(" and tc_partition = "); - suffix.append(quoteString(info.partName)); + suffix.append(" and tc_partition = ?"); } // Populate the complete query with provided prefix and suffix - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false); + List<Integer> counts = TxnUtils + .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid", + true, false); + int totalCount = 0; + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + int insertCount = counts.get(i); - for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); - int rc = stmt.executeUpdate(query); + pStmt = dbConn.prepareStatement(query); + for (int j = 0; j < insertCount; j++) { + pStmt.setLong(j + 1, txnids.get(totalCount + j)); + } + totalCount += insertCount; + paramCount = insertCount + 1; + pStmt.setString(paramCount++, info.dbname); + pStmt.setString(paramCount++, info.tableName); + if (info.partName != null) { + pStmt.setString(paramCount++, info.partName); + } + int rc = pStmt.executeUpdate(); LOG.debug("Removed " + rc + " records from txn_components"); // Don't bother cleaning from the txns table. A separate call will do that. We don't @@ -430,8 +472,7 @@ class CompactionTxnHandler extends TxnHandler { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(pStmt); - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException e) { markCleaned(info); @@ -599,34 +640,38 @@ class CompactionTxnHandler extends TxnHandler { @RetrySemantics.ReadOnly public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException { Connection dbConn = null; - Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); String quote = getIdentifierQuoteString(dbConn); - stmt = dbConn.createStatement(); StringBuilder bldr = new StringBuilder(); bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) .append(" FROM ") .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) .append(quote) .append(" WHERE ") - .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname) - .append("' AND ").append(quote).append("TABLE_NAME").append(quote) - .append(" = '").append(ci.tableName).append("'"); + .append(quote).append("DB_NAME").append(quote).append(" = ?") + .append(" AND ").append(quote).append("TABLE_NAME").append(quote) + .append(" = ?"); if (ci.partName != null) { - bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '") - .append(ci.partName).append("'"); + bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?"); } String s = bldr.toString(); + pStmt = dbConn.prepareStatement(s); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + if (ci.partName != null) { + pStmt.setString(3, ci.partName); + } /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS") + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/ LOG.debug("Going to execute <" + s + ">"); - rs = stmt.executeQuery(s); + rs = pStmt.executeQuery(); List<String> columns = new ArrayList<>(); while (rs.next()) { columns.add(rs.getString(1)); @@ -642,7 +687,7 @@ class CompactionTxnHandler extends TxnHandler { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException ex) { return findColumnsWithStats(ci); @@ -725,6 +770,7 @@ class CompactionTxnHandler extends TxnHandler { public void purgeCompactionHistory() throws MetaException { Connection dbConn = null; Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; List<Long> deleteSet = new ArrayList<>(); RetentionCounters rc = null; @@ -764,11 +810,22 @@ class CompactionTxnHandler extends TxnHandler { prefix.append("delete from COMPLETED_COMPACTIONS where "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false); - - for (String query : queries) { + List<String> questions = new ArrayList<>(deleteSet.size()); + for (int i = 0; i < deleteSet.size(); i++) { + questions.add("?"); + } + List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false); + int totalCount = 0; + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + long insertCount = counts.get(i); LOG.debug("Going to execute update <" + query + ">"); - int count = stmt.executeUpdate(query); + pStmt = dbConn.prepareStatement(query); + for (int j = 0; j < insertCount; j++) { + pStmt.setLong(j + 1, deleteSet.get(totalCount + j)); + } + totalCount += insertCount; + int count = pStmt.executeUpdate(); LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS"); } dbConn.commit(); @@ -779,6 +836,7 @@ class CompactionTxnHandler extends TxnHandler { StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); + closeStmt(pStmt); } } catch (RetryException ex) { purgeCompactionHistory(); @@ -813,17 +871,22 @@ class CompactionTxnHandler extends TxnHandler { @RetrySemantics.ReadOnly public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { Connection dbConn = null; - Statement stmt = null; + PreparedStatement pStmt = null; ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " + - "CC_DATABASE = " + quoteString(ci.dbname) + " and " + - "CC_TABLE = " + quoteString(ci.tableName) + - (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") + + pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + + "CC_DATABASE = ? and " + + "CC_TABLE = ? " + + (ci.partName != null ? "and CC_PARTITION = ?" : "") + " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); + pStmt.setString(1, ci.dbname); + pStmt.setString(2, ci.tableName); + if (ci.partName != null) { + pStmt.setString(3, ci.partName); + } + rs = pStmt.executeQuery(); int numFailed = 0; int numTotal = 0; int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); @@ -838,14 +901,14 @@ class CompactionTxnHandler extends TxnHandler { return numFailed == failedThreshold; } catch (SQLException e) { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.error("Unable to check for failed compactions " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")"); LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e)); return false;//weren't able to check } finally { - close(rs, stmt, dbConn); + close(rs, pStmt, dbConn); } } catch (RetryException e) { return checkFailedCompactions(ci); @@ -869,12 +932,16 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id); + pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt.setLong(1, ci.id); + rs = pStmt.executeQuery(); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id; + String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + pStmt = dbConn.prepareStatement(s); + pStmt.setLong(1, ci.id); LOG.debug("Going to execute update <" + s + ">"); - int updCnt = stmt.executeUpdate(s); + int updCnt = pStmt.executeUpdate(); } else { if(ci.id > 0) { @@ -897,6 +964,7 @@ class CompactionTxnHandler extends TxnHandler { ci.state = FAILED_STATE; } close(rs, stmt, null); + closeStmt(pStmt); pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)"); CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn)); http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 2f01233..afb4f6b 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.List; @@ -172,8 +173,9 @@ public class TxnUtils { * @param addParens IN: add a pair of parenthesis outside the IN lists * e.g. "(id in (1,2,3) OR id in (4,5,6))" * @param notIn IN: is this for building a 'NOT IN' composite clause? + * @return OUT: a list of the count of IN list values that are in each of the corresponding queries */ - public static void buildQueryWithINClause(Configuration conf, + public static List<Integer> buildQueryWithINClause(Configuration conf, List<String> queries, StringBuilder prefix, StringBuilder suffix, @@ -181,6 +183,47 @@ public class TxnUtils { String inColumn, boolean addParens, boolean notIn) { + List<String> inListStrings = new ArrayList<>(inList.size()); + for (Long aLong : inList) { + inListStrings.add(aLong.toString()); + } + return buildQueryWithINClauseStrings(conf, queries, prefix, suffix, + inListStrings, inColumn, addParens, notIn); + + } + /** + * Build a query (or queries if one query is too big but only for the case of 'IN' + * composite clause. For the case of 'NOT IN' clauses, multiple queries change + * the semantics of the intended query. + * E.g., Let's assume that input "inList" parameter has [5, 6] and that + * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause, + * Then having two delete statements changes the semantics of the inteneded SQL statement. + * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence + * is not equal to 'delete from T where a not in (5, 6)'.) + * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters. + * + * Note that this method currently support only single column for + * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and + * AND-based composite 'NOT IN' clause. + * For example, for 'IN' clause case, the method will build a query with OR. + * E.g., "id in (1,2,3) OR id in (4,5,6)". + * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND. + * + * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN' + * clauses in a query". + * + * @param queries OUT: Array of query strings + * @param prefix IN: Part of the query that comes before IN list + * @param suffix IN: Part of the query that comes after IN list + * @param inList IN: the list with IN list values + * @param inColumn IN: single column name of IN list operator + * @param addParens IN: add a pair of parenthesis outside the IN lists + * e.g. "(id in (1,2,3) OR id in (4,5,6))" + * @param notIn IN: is this for building a 'NOT IN' composite clause? + * @return OUT: a list of the count of IN list values that are in each of the corresponding queries + */ + public static List<Integer> buildQueryWithINClauseStrings(Configuration conf, List<String> queries, StringBuilder prefix, + StringBuilder suffix, List<String> inList, String inColumn, boolean addParens, boolean notIn) { // Get configuration parameters int maxQueryLength = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH); int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE); @@ -203,6 +246,8 @@ public class TxnUtils { StringBuilder newInclausePrefix = new StringBuilder(notIn ? " and " + inColumn + " not in (": " or " + inColumn + " in ("); + List<Integer> ret = new ArrayList<>(); + int currentCount = 0; // Loop over the given inList elements. while( cursor4InListArray < inListSize || !nextItemNeeded) { @@ -257,9 +302,11 @@ public class TxnUtils { buf.append(suffix); queries.add(buf.toString()); + ret.add(currentCount); // Prepare a new query string. buf.setLength(0); + currentCount = 0; cursor4queryOfInClauses = cursor4InClauseElements = 0; querySize = 0; newInclausePrefixJustAppended = false; @@ -276,6 +323,7 @@ public class TxnUtils { cursor4InClauseElements = 0; } else { buf.append(nextValue.toString()).append(","); + currentCount++; nextItemNeeded = true; newInclausePrefixJustAppended = false; // increment cursor for elements per 'IN'/'NOT IN' clause. @@ -293,6 +341,8 @@ public class TxnUtils { } buf.append(suffix); queries.add(buf.toString()); + ret.add(currentCount); + return ret; } /* http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java index 7dd268f..0384f8b 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java @@ -45,6 +45,7 @@ public class TestTxnUtils { @Test public void testBuildQueryWithINClause() throws Exception { List<String> queries = new ArrayList<>(); + List<Integer> ret; StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); @@ -61,16 +62,21 @@ public class TestTxnUtils { for (long i = 1; i <= 189; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(189L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 2 - Max in list members: 10; Max query string length: 1KB // The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member queries.clear(); inList.add((long)190); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(2, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(189L, ret.get(0).longValue()); + Assert.assertEquals(1L, ret.get(1).longValue()); runAgainstDerby(queries); // Case 3.1 - Max in list members: 1000, Max query string length: 1KB, and exact 1000 members in a single IN clause @@ -80,16 +86,19 @@ public class TestTxnUtils { for (long i = 191; i <= 1000; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(5, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(267L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 3.2 - Max in list members: 1000, Max query string length: 10KB, and exact 1000 members in a single IN clause MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 10); MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 1000); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 3.3 - Now with 2000 entries, try the above settings @@ -98,19 +107,25 @@ public class TestTxnUtils { } MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 1); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(10, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(267L, ret.get(0).longValue()); + Assert.assertEquals(240L, ret.get(1).longValue()); runAgainstDerby(queries); MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 10); queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(2000L, ret.get(0).longValue()); runAgainstDerby(queries); // Case 4 - NOT IN list queries.clear(); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true); Assert.assertEquals(1, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 5 - Max in list members: 1000; Max query string length: 10KB @@ -118,16 +133,21 @@ public class TestTxnUtils { for (long i = 2001; i <= 4321; i++) { inList.add(i); } - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); Assert.assertEquals(3, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); runAgainstDerby(queries); // Case 6 - No parenthesis queries.clear(); suffix.setLength(0); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false); + ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false); Assert.assertEquals(3, queries.size()); + Assert.assertEquals(queries.size(), ret.size()); + Assert.assertEquals(2255L, ret.get(0).longValue()); + Assert.assertEquals(2033L, ret.get(1).longValue()); + Assert.assertEquals(33L, ret.get(2).longValue()); runAgainstDerby(queries); }