Repository: hive Updated Branches: refs/heads/master a6155b75e -> db8fb8a42
HIVE-12439 : CompactionTxnHandler.markCleaned() and TxnHandler.openTxns() misc improvements (Wei Zheng, reviewed by Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/db8fb8a4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/db8fb8a4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/db8fb8a4 Branch: refs/heads/master Commit: db8fb8a42a690eaa937d1a0163eaf505c3c48a07 Parents: a6155b7 Author: Wei Zheng <w...@apache.org> Authored: Mon Mar 21 11:38:38 2016 -0700 Committer: Wei Zheng <w...@apache.org> Committed: Mon Mar 21 11:38:38 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 + .../metastore/txn/CompactionTxnHandler.java | 120 ++++++++------- .../hadoop/hive/metastore/txn/TxnDbUtil.java | 4 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 151 +++++++++++-------- .../hadoop/hive/metastore/txn/TxnUtils.java | 95 ++++++++++++ .../hadoop/hive/metastore/txn/TestTxnUtils.java | 135 +++++++++++++++++ 6 files changed, 390 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/db8fb8a4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 98c6372..0f8d67f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -737,6 +737,13 @@ public class HiveConf extends Configuration { "select query has incorrect syntax or something similar inside a transaction, the\n" + "entire transaction will fail and fall-back to DataNucleus will not be possible. You\n" + "should disable the usage of direct SQL inside transactions if that happens in your case."), + METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH("hive.direct.sql.max.query.length", 100, "The maximum\n" + + " size of a query string (in KB)."), + METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE("hive.direct.sql.max.elements.in.clause", 1000, + "The maximum number of values in a IN clause. Once exceeded, it will be broken into\n" + + " multiple OR separated IN clauses."), + METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE("hive.direct.sql.max.elements.values.clause", + 1000, "The maximum number of values in a VALUES clause for INSERT statement."), METASTORE_ORM_RETRIEVE_MAPNULLS_AS_EMPTY_STRINGS("hive.metastore.orm.retrieveMapNullsAsEmptyStrings",false, "Thrift does not support nulls in maps, so any nulls present in maps retrieved from ORM must " + "either be pruned or converted to empty strings. Some backing dbs such as Oracle persist empty strings " + http://git-wip-us.apache.org/repos/asf/hive/blob/db8fb8a4/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index da2b395..15c01da 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -369,36 +369,38 @@ class CompactionTxnHandler extends TxnHandler { rs = stmt.executeQuery(s); List<Long> txnids = new ArrayList<>(); while (rs.next()) txnids.add(rs.getLong(1)); + // Remove entries from txn_components, as there may be aborted txn components if (txnids.size() > 0) { + List<String> queries = new ArrayList<String>(); + + // Prepare prefix and suffix + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + prefix.append("delete from TXN_COMPONENTS where "); - // Remove entries from txn_components, as there may be aborted txn components - StringBuilder buf = new StringBuilder(); - //todo: add a safeguard to make sure IN clause is not too large; break up by txn id - buf.append("delete from TXN_COMPONENTS where tc_txnid in ("); - boolean first = true; - for (long id : txnids) { - if (first) first = false; - else buf.append(", "); - buf.append(id); - } //because 1 txn may include different partitions/tables even in auto commit mode - buf.append(") and tc_database = '"); - buf.append(info.dbname); - buf.append("' and tc_table = '"); - buf.append(info.tableName); - buf.append("'"); + suffix.append(" and tc_database = "); + suffix.append(quoteString(info.dbname)); + suffix.append(" and tc_table = "); + suffix.append(quoteString(info.tableName)); if (info.partName != null) { - buf.append(" and tc_partition = '"); - buf.append(info.partName); - buf.append("'"); + suffix.append(" and tc_partition = "); + suffix.append(quoteString(info.partName)); } - LOG.debug("Going to execute update <" + buf.toString() + ">"); - int rc = stmt.executeUpdate(buf.toString()); - LOG.debug("Removed " + rc + " records from txn_components"); - // Don't bother cleaning from the txns table. A separate call will do that. We don't - // know here which txns still have components from other tables or partitions in the - // table, so we don't know which ones we can and cannot clean. + // Populate the complete query with provided prefix and suffix + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false); + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + int rc = stmt.executeUpdate(query); + LOG.debug("Removed " + rc + " records from txn_components"); + + // Don't bother cleaning from the txns table. A separate call will do that. We don't + // know here which txns still have components from other tables or partitions in the + // table, so we don't know which ones we can and cannot clean. + } } LOG.debug("Going to commit"); @@ -445,16 +447,25 @@ class CompactionTxnHandler extends TxnHandler { if(txnids.size() <= 0) { return; } - for(int i = 0; i < txnids.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE; i++) { - List<Long> txnIdBatch = txnids.subList(i * TIMED_OUT_TXN_ABORT_BATCH_SIZE, - (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE); - deleteTxns(dbConn, stmt, txnIdBatch); - } - int partialBatchSize = txnids.size() % TIMED_OUT_TXN_ABORT_BATCH_SIZE; - if(partialBatchSize > 0) { - List<Long> txnIdBatch = txnids.subList(txnids.size() - partialBatchSize, txnids.size()); - deleteTxns(dbConn, stmt, txnIdBatch); + + List<String> queries = new ArrayList<String>(); + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + prefix.append("delete from TXNS where "); + suffix.append(""); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false); + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + int rc = stmt.executeUpdate(query); + LOG.info("Removed " + rc + " empty Aborted transactions from TXNS"); } + LOG.info("Aborted transactions removed from TXNS: " + txnids); + + LOG.debug("Going to commit"); + dbConn.commit(); } catch (SQLException e) { LOG.error("Unable to delete from txns table " + e.getMessage()); LOG.debug("Going to rollback"); @@ -469,18 +480,6 @@ class CompactionTxnHandler extends TxnHandler { cleanEmptyAbortedTxns(); } } - private static void deleteTxns(Connection dbConn, Statement stmt, List<Long> txnIdBatch) throws SQLException { - StringBuilder buf = new StringBuilder("delete from TXNS where txn_id in ("); - for(long txnid : txnIdBatch) { - buf.append(txnid).append(','); - } - buf.setCharAt(buf.length() - 1, ')'); - LOG.debug("Going to execute update <" + buf + ">"); - int rc = stmt.executeUpdate(buf.toString()); - LOG.info("Removed " + rc + " empty Aborted transactions: " + txnIdBatch + " from TXNS"); - LOG.debug("Going to commit"); - dbConn.commit(); - } /** * This will take all entries assigned to workers @@ -722,22 +721,21 @@ class CompactionTxnHandler extends TxnHandler { checkForDeletion(deleteSet, ci, rc); } close(rs); - - String baseDeleteSql = "delete from COMPLETED_COMPACTIONS where cc_id IN("; - StringBuilder queryStr = new StringBuilder(baseDeleteSql); - for(int i = 0; i < deleteSet.size(); i++) { - if(i > 0 && i % TIMED_OUT_TXN_ABORT_BATCH_SIZE == 0) { - queryStr.setCharAt(queryStr.length() - 1, ')'); - stmt.executeUpdate(queryStr.toString()); - dbConn.commit(); - queryStr = new StringBuilder(baseDeleteSql); - } - queryStr.append(deleteSet.get(i)).append(','); - } - if(queryStr.length() > baseDeleteSql.length()) { - queryStr.setCharAt(queryStr.length() - 1, ')'); - int updCnt = stmt.executeUpdate(queryStr.toString()); - dbConn.commit(); + + List<String> queries = new ArrayList<String>(); + + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + prefix.append("delete from COMPLETED_COMPACTIONS where "); + suffix.append(""); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false); + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + int count = stmt.executeUpdate(query); + LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS"); } dbConn.commit(); } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/db8fb8a4/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 5d10b5c..8172242 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -248,7 +248,7 @@ public final class TxnDbUtil { } } - private static Connection getConnection() throws Exception { + static Connection getConnection() throws Exception { HiveConf conf = new HiveConf(); String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER); Driver driver = (Driver) Class.forName(jdbcDriver).newInstance(); @@ -264,7 +264,7 @@ public final class TxnDbUtil { return conn; } - private static void closeResources(Connection conn, Statement stmt, ResultSet rs) { + static void closeResources(Connection conn, Statement stmt, ResultSet rs) { if (rs != null) { try { rs.close(); http://git-wip-us.apache.org/repos/asf/hive/blob/db8fb8a4/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 53d2bb4..afeb168 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -349,19 +349,46 @@ abstract class TxnHandler implements TxnStore { s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); + long now = getDbTime(dbConn); - s = "insert into TXNS (txn_id, txn_state, txn_started, " + - "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + - now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; - LOG.debug("Going to prepare statement <" + s + ">"); - PreparedStatement ps = dbConn.prepareStatement(s); List<Long> txnIds = new ArrayList<Long>(numTxns); + ArrayList<String> queries = new ArrayList<String>(); + String query; + String insertClause = "insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host) values "; + StringBuilder valuesClause = new StringBuilder(); + for (long i = first; i < first + numTxns; i++) { - ps.setLong(1, i); - //todo: this would be more efficient with a single insert with multiple rows in values() - //need add a safeguard to not exceed the DB capabilities. - ps.executeUpdate(); txnIds.add(i); + + if (i > first && + (i - first) % conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { + // wrap up the current query, and start a new one + query = insertClause + valuesClause.toString(); + queries.add(query); + + valuesClause.setLength(0); + valuesClause.append("(").append(i).append(", 'o', ").append(now).append(", ").append(now) + .append(", '").append(rqst.getUser()).append("', '").append(rqst.getHostname()) + .append("')"); + + continue; + } + + if (i > first) { + valuesClause.append(", "); + } + + valuesClause.append("(").append(i).append(", 'o', ").append(now).append(", ").append(now) + .append(", '").append(rqst.getUser()).append("', '").append(rqst.getHostname()) + .append("')"); + } + + query = insertClause + valuesClause.toString(); + queries.add(query); + + for (String q : queries) { + LOG.debug("Going to execute update <" + q + ">"); + stmt.execute(q); } LOG.debug("Going to commit"); dbConn.commit(); @@ -1798,40 +1825,49 @@ abstract class TxnHandler implements TxnStore { stmt = dbConn.createStatement(); //This is an update statement, thus at any Isolation level will take Write locks so will block //all other ops using S4U on TXNS row. - StringBuilder buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + - "' where txn_state = '" + TXN_OPEN + "' and txn_id in ("); - boolean first = true; - for (Long id : txnids) { - if (first) first = false; - else buf.append(','); - buf.append(id); - } - buf.append(')'); + List<String> queries = new ArrayList<String>(); + + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + prefix.append("update TXNS set txn_state = " + quoteChar(TXN_ABORTED) + + " where txn_state = " + quoteChar(TXN_OPEN) + " and "); if(max_heartbeat > 0) { - buf.append(" and txn_last_heartbeat < ").append(max_heartbeat); + suffix.append(" and txn_last_heartbeat < ").append(max_heartbeat); + } else { + suffix.append(""); } - LOG.debug("Going to execute update <" + buf.toString() + ">"); - updateCnt = stmt.executeUpdate(buf.toString()); - if(updateCnt < txnids.size()) { - /** - * have to bail in this case since we don't know which transactions were not Aborted and - * thus don't know which locks to delete - * This may happen due to a race between {@link #heartbeat(HeartbeatRequest)} operation and - * {@link #performTimeOuts()} - */ - return updateCnt; + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false); + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + updateCnt = stmt.executeUpdate(query); + if (updateCnt < txnids.size()) { + /** + * have to bail in this case since we don't know which transactions were not Aborted and + * thus don't know which locks to delete + * This may happen due to a race between {@link #heartbeat(HeartbeatRequest)} operation and + * {@link #performTimeOuts()} + */ + return updateCnt; + } } - buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); - first = true; - for (Long id : txnids) { - if (first) first = false; - else buf.append(','); - buf.append(id); + queries.clear(); + prefix.setLength(0); + suffix.setLength(0); + + prefix.append("delete from HIVE_LOCKS where "); + suffix.append(""); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false); + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + int rc = stmt.executeUpdate(query); + LOG.debug("Removed " + rc + " records from HIVE_LOCKS"); } - buf.append(')'); - LOG.debug("Going to execute update <" + buf.toString() + ">"); - stmt.executeUpdate(buf.toString()); } finally { closeStmt(stmt); } @@ -2250,31 +2286,28 @@ abstract class TxnHandler implements TxnStore { if(extLockIDs.size() <= 0) { return; } - int deletedLocks = 0; + + List<String> queries = new ArrayList<String>(); + + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + //include same hl_last_heartbeat condition in case someone heartbeated since the select - s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + maxHeartbeatTime + " and hl_txnid = 0" + - " and hl_lock_ext_id IN ("; - int numWholeBatches = extLockIDs.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE; - for(int i = 0; i < numWholeBatches; i++) { - StringBuilder sb = new StringBuilder(s); - for(int j = i * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j < (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j++) { - sb.append(extLockIDs.get(j)).append(","); - } - sb.setCharAt(sb.length() - 1, ')'); - LOG.debug("Removing expired locks via: " + sb.toString()); - deletedLocks += stmt.executeUpdate(sb.toString()); - dbConn.commit(); - } - StringBuilder sb = new StringBuilder(s); - for(int i = numWholeBatches * TIMED_OUT_TXN_ABORT_BATCH_SIZE; i < extLockIDs.size(); i++) { - sb.append(extLockIDs.get(i)).append(","); + prefix.append("delete from HIVE_LOCKS where hl_last_heartbeat < "); + prefix.append(maxHeartbeatTime); + prefix.append(" and hl_txnid = 0 and "); + suffix.append(""); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false); + + int deletedLocks = 0; + for (String query : queries) { + LOG.debug("Removing expired locks via: " + query); + deletedLocks += stmt.executeUpdate(query); } - sb.setCharAt(sb.length() - 1, ')'); - LOG.debug("Removing expired locks via: " + sb.toString()); - deletedLocks += stmt.executeUpdate(sb.toString()); if(deletedLocks > 0) { LOG.info("Deleted " + deletedLocks + " ext locks from HIVE_LOCKS due to timeout (vs. " + - extLockIDs.size() + " found. List: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime); + extLockIDs.size() + " found. List: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime); } LOG.debug("Going to commit"); dbConn.commit(); http://git-wip-us.apache.org/repos/asf/hive/blob/db8fb8a4/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 0d90b11..57b88cc 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.List; import java.util.Set; public class TxnUtils { @@ -112,4 +113,98 @@ public class TxnUtils { String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + + /** + * Build a query (or queries if one query is too big) with specified "prefix" and "suffix", + * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6) + * For NOT IN case, NOT IN list is broken into multiple AND clauses. + * @param queries array of complete query strings + * @param prefix part of the query that comes before IN list + * @param suffix part of the query that comes after IN list + * @param inList the list containing IN list values + * @param inColumn column name of IN list operator + * @param addParens add a pair of parenthesis outside the IN lists + * e.g. ( id in (1,2,3) OR id in (4,5,6) ) + * @param notIn clause to be broken up is NOT IN + */ + public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix, + StringBuilder suffix, List<Long> inList, + String inColumn, boolean addParens, boolean notIn) { + int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE); + int numWholeBatches = inList.size() / batchSize; + StringBuilder buf = new StringBuilder(); + buf.append(prefix); + if (addParens) { + buf.append("("); + } + buf.append(inColumn); + if (notIn) { + buf.append(" not in ("); + } else { + buf.append(" in ("); + } + + for (int i = 0; i <= numWholeBatches; i++) { + if (needNewQuery(conf, buf)) { + // Wrap up current query string + if (addParens) { + buf.append(")"); + } + buf.append(suffix); + queries.add(buf.toString()); + + // Prepare a new query string + buf.setLength(0); + } + + if (i > 0) { + if (notIn) { + if (buf.length() == 0) { + buf.append(prefix); + if (addParens) { + buf.append("("); + } + } else { + buf.append(" and "); + } + buf.append(inColumn); + buf.append(" not in ("); + } else { + if (buf.length() == 0) { + buf.append(prefix); + if (addParens) { + buf.append("("); + } + } else { + buf.append(" or "); + } + buf.append(inColumn); + buf.append(" in ("); + } + } + + if (i * batchSize == inList.size()) { + // At this point we just realized we don't need another query + return; + } + for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) { + buf.append(inList.get(j)).append(","); + } + buf.setCharAt(buf.length() - 1, ')'); + } + + if (addParens) { + buf.append(")"); + } + buf.append(suffix); + queries.add(buf.toString()); + } + + /** Estimate if the size of a string will exceed certain limit */ + private static boolean needNewQuery(HiveConf conf, StringBuilder sb) { + int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH); + // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml + long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8); + return sizeInBytes / 1024 > queryMemoryLimit; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/db8fb8a4/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java new file mode 100644 index 0000000..a2195ed --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for TxnUtils + */ +public class TestTxnUtils { + private HiveConf conf; + + public TestTxnUtils() throws Exception { + } + + @Test + public void testBuildQueryWithINClause() throws Exception { + List<String> queries = new ArrayList<String>(); + + StringBuilder prefix = new StringBuilder(); + StringBuilder suffix = new StringBuilder(); + + // Note, this is a "real" query that depends on one of the metastore tables + prefix.append("select count(*) from TXNS where "); + suffix.append(" and TXN_STATE = 'o'"); + + // Case 1 - Max in list members: 10; Max query string length: 1KB + // The first query happens to have 2 full batches. + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1); + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 10); + List<Long> inList = new ArrayList<Long>(); + for (long i = 1; i <= 200; i++) { + inList.add(i); + } + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + Assert.assertEquals(1, queries.size()); + runAgainstDerby(queries); + + // Case 2 - Max in list members: 10; Max query string length: 1KB + // The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member + queries.clear(); + inList.add((long)201); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + Assert.assertEquals(2, queries.size()); + runAgainstDerby(queries); + + // Case 3 - Max in list members: 1000; Max query string length: 5KB + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 10); + conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 1000); + queries.clear(); + for (long i = 202; i <= 4321; i++) { + inList.add(i); + } + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false); + Assert.assertEquals(3, queries.size()); + runAgainstDerby(queries); + + // Case 4 - NOT IN list + queries.clear(); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true); + Assert.assertEquals(3, queries.size()); + runAgainstDerby(queries); + + // Case 5 - No parenthesis + queries.clear(); + suffix.setLength(0); + suffix.append(""); + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false); + Assert.assertEquals(3, queries.size()); + runAgainstDerby(queries); + } + + /** Verify queries can run against Derby DB. + * As long as Derby doesn't complain, we assume the query is syntactically/semantically correct. + */ + private void runAgainstDerby(List<String> queries) throws Exception { + Connection conn = null; + Statement stmt = null; + ResultSet rs = null; + + try { + conn = TxnDbUtil.getConnection(); + stmt = conn.createStatement(); + for (String query : queries) { + rs = stmt.executeQuery(query); + Assert.assertTrue("The query is not valid", rs.next()); + } + } finally { + TxnDbUtil.closeResources(conn, stmt, rs); + } + } + + @Before + public void setUp() throws Exception { + tearDown(); + conf = new HiveConf(this.getClass()); + TxnDbUtil.setConfValues(conf); + TxnDbUtil.prepDb(); + } + + @After + public void tearDown() throws Exception { + TxnDbUtil.cleanDb(); + } +}