This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 8ff1fb9 HIVE-22765: Quote table names in CompactionTxnHandler (Zoltan Chovan, reviewed by Peter Vary) 8ff1fb9 is described below commit 8ff1fb9be0a54de61e1fa6e83f4f3a2de0f10503 Author: Zoltan Chovan <zcho...@cloudera.com> AuthorDate: Tue Feb 11 11:12:46 2020 +0100 HIVE-22765: Quote table names in CompactionTxnHandler (Zoltan Chovan, reviewed by Peter Vary) --- .../hive/metastore/txn/CompactionTxnHandler.java | 182 +++++++++++---------- 1 file changed, 94 insertions(+), 88 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index bae23f7..0f94e13 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -75,20 +75,20 @@ class CompactionTxnHandler extends TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); // Check for completed transactions - String s = "select distinct tc.ctc_database, tc.ctc_table, tc.ctc_partition " + - "from COMPLETED_TXN_COMPONENTS tc " + (checkInterval > 0 ? - "left join ( " + - " select c1.* from COMPLETED_COMPACTIONS c1 " + - " inner join ( " + - " select max(cc_id) cc_id from COMPLETED_COMPACTIONS " + - " group by cc_database, cc_table, cc_partition" + - " ) c2 " + - " on c1.cc_id = c2.cc_id " + - " where c1.cc_state IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + - ") c " + - "on tc.ctc_database = c.cc_database and tc.ctc_table = c.cc_table " + - " and (tc.ctc_partition = c.cc_partition or (tc.ctc_partition is null and c.cc_partition is null)) " + - "where c.cc_id is not null or " + isWithinCheckInterval("tc.ctc_timestamp", checkInterval) : ""); + String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\".\"CTC_PARTITION\" " + + "FROM \"COMPLETED_TXN_COMPONENTS\" TC " + (checkInterval > 0 ? + "LEFT JOIN ( " + + " SELECT \"C1\".* FROM \"COMPLETED_COMPACTIONS\" \"C1\" " + + " INNER JOIN ( " + + " SELECT MAX(\"CC_ID\") \"CC_ID\" FROM \"COMPLETED_COMPACTIONS\" " + + " GROUP BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\"" + + " ) \"C2\" " + + " ON \"C1\".\"CC_ID\" = \"C2\".\"CC_ID\" " + + " WHERE \"C1\".\"CC_STATE\" IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" + + ") \"C\" " + + "ON \"TC\".\"CTC_DATABASE\" = \"C\".\"CC_DATABASE\" AND \"TC\".\"CTC_TABLE\" = \"C\".\"CC_TABLE\" " + + " AND (\"TC\".\"CTC_PARTITION\" = \"C\".\"CC_PARTITION\" OR (\"TC\".\"CTC_PARTITION\" IS NULL AND \"C\".\"CC_PARTITION\" IS NULL)) " + + "WHERE \"C\".\"CC_ID\" IS NOT NULL OR " + isWithinCheckInterval("\"TC\".\"CTC_TIMESTAMP\"", checkInterval) : ""); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -102,11 +102,11 @@ class CompactionTxnHandler extends TxnHandler { rs.close(); // Check for aborted txns - s = "select tc_database, tc_table, tc_partition " + - "from TXNS, TXN_COMPONENTS " + - "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + - "group by tc_database, tc_table, tc_partition " + - "having count(*) > " + abortedThreshold; + s = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + + "FROM \"TXNS\", \"TXN_COMPONENTS\" " + + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " + + "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + + "HAVING COUNT(*) > " + abortedThreshold; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -151,8 +151,8 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -171,9 +171,9 @@ class CompactionTxnHandler extends TxnHandler { info.properties = rs.getString(6); // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); - s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + - " AND cq_state='" + INITIATED_STATE + "'"; + s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = '" + workerId + "', " + + "\"CQ_START\" = " + now + ", \"CQ_STATE\" = '" + WORKING_STATE + "' WHERE \"CQ_ID\" = " + info.id + + " AND \"CQ_STATE\"='" + INITIATED_STATE + "'"; LOG.debug("Going to execute update <" + s + ">"); int updCount = updStmt.executeUpdate(s); if(updCount == 1) { @@ -221,8 +221,8 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " + + "\"CQ_WORKER_ID\" = NULL WHERE \"CQ_ID\" = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); if (updCnt != 1) { @@ -265,8 +265,8 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " - + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '" + String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -333,7 +333,7 @@ class CompactionTxnHandler extends TxnHandler { * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher) */ private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { - String s = "select ntxn_next from NEXT_TXN_ID"; + String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -341,7 +341,7 @@ class CompactionTxnHandler extends TxnHandler { "initialized, no record found in next_txn_id"); } long hwm = rs.getLong(1); - s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; + s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); rs.next(); @@ -369,9 +369,10 @@ class CompactionTxnHandler extends TxnHandler { ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, " - + "CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, " - + "CQ_HADOOP_JOB_ID, CQ_ERROR_MESSAGE from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " + + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\" " + + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); pStmt.setLong(1, info.id); rs = pStmt.executeQuery(); if(rs.next()) { @@ -391,10 +392,11 @@ class CompactionTxnHandler extends TxnHandler { LOG.debug("Going to rollback"); dbConn.rollback(); } - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, " - + "CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, " - + "CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID, CC_ERROR_MESSAGE) " - + "VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); + pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", " + + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", " + + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", " + + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\")" + + " VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); info.state = SUCCEEDED_STATE; CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn)); updCount = pStmt.executeUpdate(); @@ -402,13 +404,13 @@ class CompactionTxnHandler extends TxnHandler { // Remove entries from completed_txn_components as well, so we don't start looking there // again but only up to the highest write ID include in this compaction job. //highestWriteId will be NULL in upgrade scenarios - s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " + - "ctc_table = ?"; + s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND " + + "\"CTC_TABLE\" = ?"; if (info.partName != null) { - s += " and ctc_partition = ?"; + s += " AND \"CTC_PARTITION\" = ?"; } if(info.highestWriteId != 0) { - s += " and ctc_writeid <= ?"; + s += " AND \"CTC_WRITEID\" <= ?"; } pStmt = dbConn.prepareStatement(s); int paramCount = 1; @@ -431,10 +433,10 @@ class CompactionTxnHandler extends TxnHandler { * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns). * See {@link ql.txn.compactor.Cleaner.removeFiles()} */ - s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = ? and tc_table = ?"; - if (info.highestWriteId != 0) s += " and tc_writeid <= ?"; - if (info.partName != null) s += " and tc_partition = ?"; + s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" " + + "AND \"TXN_STATE\" = '" + TXN_ABORTED + "' AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?"; + if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?"; + if (info.partName != null) s += " AND \"TC_PARTITION\" = ?"; pStmt = dbConn.prepareStatement(s); paramCount = 1; @@ -464,18 +466,18 @@ class CompactionTxnHandler extends TxnHandler { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("delete from TXN_COMPONENTS where "); + prefix.append("DELETE FROM \"TXN_COMPONENTS\" WHERE "); //because 1 txn may include different partitions/tables even in auto commit mode - suffix.append(" and tc_database = ?"); - suffix.append(" and tc_table = ?"); + suffix.append(" AND \"TC_DATABASE\" = ?"); + suffix.append(" AND \"TC_TABLE\" = ?"); if (info.partName != null) { - suffix.append(" and tc_partition = ?"); + suffix.append(" AND \"TC_PARTITION\" = ?"); } // Populate the complete query with provided prefix and suffix List<Integer> counts = TxnUtils - .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid", + .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "\"TC_TXNID\"", true, false); int totalCount = 0; for (int i = 0; i < queries.size(); i++) { @@ -544,7 +546,7 @@ class CompactionTxnHandler extends TxnHandler { // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). - String s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); + String s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (rs.next()) { @@ -555,7 +557,7 @@ class CompactionTxnHandler extends TxnHandler { } // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. - s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; + s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommittedTxnId; LOG.debug("Going to execute delete <" + s + ">"); int rc = stmt.executeUpdate(s); LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId); @@ -594,9 +596,9 @@ class CompactionTxnHandler extends TxnHandler { //after that, so READ COMMITTED is sufficient. dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; + String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " + + "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " + + "\"TXN_STATE\" = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); List<Long> txnids = new ArrayList<>(); @@ -612,10 +614,10 @@ class CompactionTxnHandler extends TxnHandler { StringBuilder suffix = new StringBuilder(); // Delete from TXNS. - prefix.append("delete from TXNS where "); + prefix.append("DELETE FROM \"TXNS\" WHERE "); suffix.append(""); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false); for (String query : queries) { LOG.debug("Going to execute update <" + query + ">"); @@ -658,8 +660,8 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" + + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_WORKER_ID\" LIKE '" + hostname + "%'"; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died @@ -703,8 +705,8 @@ class CompactionTxnHandler extends TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); long latestValidStart = getDbTime(dbConn) - timeout; stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL, \"CQ_STATE\" = '" + + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' AND \"CQ_START\" < " + latestValidStart; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died @@ -800,9 +802,9 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String sqlText = "UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + - ci.highestWriteId + ", cq_run_as = " + quoteString(ci.runAs) + - " WHERE CQ_ID = " + ci.id; + String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HIGHEST_WRITE_ID\" = " + + ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + + " WHERE \"CQ_ID\" = " + ci.id; if(LOG.isDebugEnabled()) { LOG.debug("About to execute: " + sqlText); } @@ -818,13 +820,13 @@ class CompactionTxnHandler extends TxnHandler { * a new write id (so as not to invalidate result set caches/materialized views) but * we need to set it to something to that markCleaned() only cleans TXN_COMPONENTS up to * the level to which aborted files/data has been cleaned.*/ - sqlText = "insert into TXN_COMPONENTS(" + - "TC_TXNID, " + - "TC_DATABASE, " + - "TC_TABLE, " + - (ci.partName == null ? "" : "TC_PARTITION, ") + - "TC_WRITEID, " + - "TC_OPERATION_TYPE)" + + sqlText = "INSERT INTO \"TXN_COMPONENTS\"(" + + "\"TC_TXNID\", " + + "\"TC_DATABASE\", " + + "\"TC_TABLE\", " + + (ci.partName == null ? "" : "\"TC_PARTITION\", ") + + "\"TC_WRITEID\", " + + "\"TC_OPERATION_TYPE\")" + " VALUES(" + compactionTxnId + "," + quoteString(ci.dbname) + "," + @@ -907,8 +909,8 @@ class CompactionTxnHandler extends TxnHandler { stmt = dbConn.createStatement(); /*cc_id is monotonically increasing so for any entity sorts in order of compaction history, thus this query groups by entity and withing group sorts most recent first*/ - rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " + - "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc"); + rs = stmt.executeQuery("SELECT \"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\" " + + "FROM \"COMPLETED_COMPACTIONS\" ORDER BY \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_ID\" DESC"); String lastCompactedEntity = null; /*In each group, walk from most recent and count occurences of each state type. Once you * have counted enough (for each state) to satisfy retention policy, delete all other @@ -934,14 +936,15 @@ class CompactionTxnHandler extends TxnHandler { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - prefix.append("delete from COMPLETED_COMPACTIONS where "); + prefix.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE "); suffix.append(""); List<String> questions = new ArrayList<>(deleteSet.size()); for (int i = 0; i < deleteSet.size(); i++) { questions.add("?"); } - List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false); + List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, + "\"CC_ID\"", false, false); int totalCount = 0; for (int i = 0; i < queries.size(); i++) { String query = queries.get(i); @@ -1003,11 +1006,11 @@ class CompactionTxnHandler extends TxnHandler { try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " + - "CC_DATABASE = ? and " + - "CC_TABLE = ? " + - (ci.partName != null ? "and CC_PARTITION = ?" : "") + - " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc"); + pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\" FROM \"COMPLETED_COMPACTIONS\" WHERE " + + "\"CC_DATABASE\" = ? AND " + + "\"CC_TABLE\" = ? " + + (ci.partName != null ? "AND \"CC_PARTITION\" = ?" : "") + + " AND \"CC_STATE\" != " + quoteChar(ATTEMPTED_STATE) + " ORDER BY \"CC_ID\" DESC"); pStmt.setString(1, ci.dbname); pStmt.setString(2, ci.tableName); if (ci.partName != null) { @@ -1062,14 +1065,15 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, " - + "CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, " - + "CQ_HADOOP_JOB_ID, CQ_ERROR_MESSAGE from COMPACTION_QUEUE WHERE CQ_ID = ?"); + pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", " + + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\" " + + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); pStmt.setLong(1, ci.id); rs = pStmt.executeQuery(); if(rs.next()) { ci = CompactionInfo.loadFullFromCompactionQueue(rs); - String s = "delete from COMPACTION_QUEUE where cq_id = ?"; + String s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"; pStmt = dbConn.prepareStatement(s); pStmt.setLong(1, ci.id); LOG.debug("Going to execute update <" + s + ">"); @@ -1098,9 +1102,10 @@ class CompactionTxnHandler extends TxnHandler { close(rs, stmt, null); closeStmt(pStmt); - pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, " - + "CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, " - + "CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID, CC_ERROR_MESSAGE) " + pStmt = dbConn.prepareStatement("INSERT INTO \"COMPLETED_COMPACTIONS\" " + + "(\"CC_ID\", \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", " + + "\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", \"CC_START\", \"CC_END\", \"CC_RUN_AS\", " + + "\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\") " + "VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?)"); if (errorMessage != null) { ci.errorMessage = errorMessage; @@ -1138,7 +1143,8 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id; + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HADOOP_JOB_ID\" = " + quoteString(hadoopJobId) + + " WHERE \"CQ_ID\" = " + id; LOG.debug("Going to execute <" + s + ">"); int updateCount = stmt.executeUpdate(s); LOG.debug("Going to commit");