Repository: hive Updated Branches: refs/heads/branch-1 a6e7cfa9e -> 738709117
HIVE-11317 - ACID: Improve transaction Abort logic due to timeout (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/73870911 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/73870911 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/73870911 Branch: refs/heads/branch-1 Commit: 738709117f046744730648c4c6df6c2af0465969 Parents: a6e7cfa Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Sat Aug 15 10:36:47 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Sat Aug 15 10:36:47 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../deployers/config/hive/hive-site.mysql.xml | 22 +++ .../hive/hcatalog/streaming/TestStreaming.java | 54 ++++++- .../hadoop/hive/metastore/HiveMetaStore.java | 18 +++ .../hive/metastore/HouseKeeperService.java | 39 +++++ .../hadoop/hive/metastore/txn/TxnHandler.java | 153 +++++++++++++------ .../hive/metastore/txn/TestTxnHandler.java | 7 +- .../java/org/apache/hadoop/hive/ql/Driver.java | 15 +- .../hive/ql/txn/AcidHouseKeeperService.java | 104 +++++++++++++ .../hive/ql/txn/compactor/CompactorMR.java | 6 +- .../hadoop/hive/ql/txn/compactor/Initiator.java | 1 + .../hadoop/hive/ql/txn/compactor/Worker.java | 2 +- .../apache/hadoop/hive/ql/TestTxnCommands.java | 21 +++ .../apache/hadoop/hive/ql/TestTxnCommands2.java | 1 + .../hive/ql/lockmgr/TestDbTxnManager.java | 35 +++-- 15 files changed, 421 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 95aaf55..acc72c8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1507,6 +1507,10 @@ public class HiveConf extends Configuration { HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms", new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"), + HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s", + new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"), + HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s", + new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"), // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true, http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml ---------------------------------------------------------------------- diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml index 70ccc31..b6f1ab7 100644 --- a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml +++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml @@ -62,6 +62,28 @@ <name>hive.exec.dynamic.partition.mode</name> <value>nonstrict</value> </property> + <property> + <name>hive.compactor.initiator.on</name> + <value>false</value> + </property> + <property> + <name>hive.compactor.worker.threads</name> + <value>2</value> + </property> + <property> + <name>hive.timedout.txn.reaper.start</name> + <value>2s</value> + </property> +<!-- <property> + <name>hive.txn.timeout</name> + <value>60s</value> + </property> + --> + <property> + <name>hive.timedout.txn.reaper.interval</name> + <value>30s</value> + </property> + <!--end ACID related properties--> <!-- <property> http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index c0af533..c28d4aa 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -18,7 +18,6 @@ package org.apache.hive.hcatalog.streaming; -import junit.framework.Assert; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; @@ -36,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; @@ -51,6 +52,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.thrift.TException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -67,6 +69,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; public class TestStreaming { @@ -301,6 +304,55 @@ public class TestStreaming { connection.close(); } + /** + * check that transactions that have not heartbeated and timedout get properly aborted + * @throws Exception + */ + @Test + public void testTimeOutReaper() throws Exception { + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); + DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); + StreamingConnection connection = endPt.newConnection(false, null); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(5, writer); + txnBatch.beginNextTransaction(); + conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS); + //ensure txn timesout + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS); + AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService(); + houseKeeperService.start(conf); + while(houseKeeperService.getIsAliveCounter() <= Integer.MIN_VALUE) { + Thread.sleep(100);//make sure it has run at least once + } + houseKeeperService.stop(); + try { + //should fail because the TransactionBatch timed out + txnBatch.commit(); + } + catch(TransactionError e) { + Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException); + } + txnBatch.close(); + txnBatch = connection.fetchTransactionBatch(10, writer); + txnBatch.beginNextTransaction(); + txnBatch.commit(); + txnBatch.beginNextTransaction(); + int lastCount = houseKeeperService.getIsAliveCounter(); + houseKeeperService.start(conf); + while(houseKeeperService.getIsAliveCounter() <= lastCount) { + Thread.sleep(100);//make sure it has run at least once + } + houseKeeperService.stop(); + try { + //should fail because the TransactionBatch timed out + txnBatch.commit(); + } + catch(TransactionError e) { + Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException); + } + txnBatch.close(); + connection.close(); + } @Test public void testTransactionBatchEmptyAbort() throws Exception { // 1) to partitioned table http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 8f6191e..19a986d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6224,6 +6224,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { startCompactorInitiator(conf); startCompactorWorkers(conf); startCompactorCleaner(conf); + startHouseKeeperService(conf); } catch (Throwable e) { LOG.error("Failure when starting the compactor, compactions may not happen, " + StringUtils.stringifyException(e)); @@ -6283,4 +6284,21 @@ public class HiveMetaStore extends ThriftHiveMetastore { thread.init(new AtomicBoolean(), new AtomicBoolean()); thread.start(); } + private static void startHouseKeeperService(HiveConf conf) throws Exception { + if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) { + return; + } + Class c = Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"); + //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop() + //should be called form it + HouseKeeperService houseKeeper = (HouseKeeperService)c.newInstance(); + try { + houseKeeper.start(conf); + } + catch (Exception ex) { + LOG.fatal("Failed to start " + houseKeeper.getClass() + + ". The system will not handle " + houseKeeper.getServiceDescription() + + ". Root Cause: ", ex); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java new file mode 100644 index 0000000..eb4ea93 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * Runs arbitrary background logic inside the metastore service. + */ +@InterfaceAudience.LimitedPrivate({"Hive"}) +@InterfaceStability.Evolving +public interface HouseKeeperService { + public void start(HiveConf hiveConf) throws Exception; + /** + * Should perform orderly shutdown + */ + public void stop(); + /** + * Returns short description of services this module provides. + */ + public String getServiceDescription(); +} http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 88e007c..795b2d9 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -46,6 +46,11 @@ import java.util.concurrent.TimeUnit; /** * A handler to answer transaction related calls that come into the metastore * server. + * + * Note on log messages: Please include txnid:X and lockid info + * {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages. + * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated, + * so keeping the format consistent makes grep'ing the logs much easier. */ public class TxnHandler { // Compactor states @@ -212,7 +217,6 @@ public class TxnHandler { Statement stmt = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - timeOutTxns(dbConn); stmt = dbConn.createStatement(); String s = "select ntxn_next - 1 from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); @@ -463,8 +467,6 @@ public class TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); long extLockId = rqst.getLockid(); - // Clean up timed out locks - timeOutLocks(dbConn); // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks @@ -1361,8 +1363,6 @@ public class TxnHandler { // and prevent other operations (such as committing transactions, showing locks, // etc.) that should not interfere with this one. synchronized (lockLock) { - // Clean up timed out locks before we attempt to acquire any. - timeOutLocks(dbConn); Statement stmt = null; try { stmt = dbConn.createStatement(); @@ -1732,15 +1732,22 @@ public class TxnHandler { LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { + s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; + ResultSet rs2 = stmt.executeQuery(s); + boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; LOG.debug("Going to rollback"); dbConn.rollback(); + if(alreadyCommitted) { + //makes the message more informative - helps to find bugs in client code + throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); + } throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); } if (rs.getString(1).charAt(0) == TXN_ABORTED) { LOG.debug("Going to rollback"); dbConn.rollback(); throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + - " already aborted"); + " already aborted");//todo: add time of abort, which is not currently tracked } s = "update TXNS set txn_last_heartbeat = " + now + " where txn_id = " + txnid; @@ -1802,61 +1809,121 @@ public class TxnHandler { } } - // Clean time out locks from the database. This does a commit, + // Clean time out locks from the database not associated with a transactions, i.e. locks + // for read-only autoCommit=true statements. This does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions. - private void timeOutLocks(Connection dbConn) throws SQLException, MetaException { - long now = getDbTime(dbConn); + private void timeOutLocks(Connection dbConn) { Statement stmt = null; try { + long now = getDbTime(dbConn); stmt = dbConn.createStatement(); // Remove any timed out locks from the table. String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout); + (now - timeout) + " and (hl_txnid = 0 or hl_txnid is NULL)";//when txnid is > 0, the lock is + //associated with a txn and is handled by performTimeOuts() + //want to avoid expiring locks for a txn w/o expiring the txn itself LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + int deletedLocks = stmt.executeUpdate(s); + if(deletedLocks > 0) { + LOG.info("Deleted " + deletedLocks + " locks from HIVE_LOCKS due to timeout"); + } LOG.debug("Going to commit"); dbConn.commit(); + } + catch(SQLException ex) { + LOG.error("Failed to purge timedout locks due to: " + getMessage(ex), ex); + } + catch(Exception ex) { + LOG.error("Failed to purge timedout locks due to: " + ex.getMessage(), ex); } finally { closeStmt(stmt); } } - // Abort timed out transactions. This does a commit, - // and thus should be done before any calls to heartbeat that will leave - // open transactions on the underlying database. - private void timeOutTxns(Connection dbConn) throws SQLException, MetaException, RetryException { - long now = getDbTime(dbConn); + /** + * Suppose you have a query "select a,b from T" and you want to limit the result set + * to the first 5 rows. The mechanism to do that differs in different DB. + * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the + * appropriately modified row limiting query. + */ + private String addLimitClause(Connection dbConn, int numRows, String noSelectsqlQuery) throws MetaException { + DatabaseProduct prod = determineDatabaseProduct(dbConn); + switch (prod) { + case DERBY: + //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html + return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only"; + case MYSQL: + //http://www.postgresql.org/docs/7.3/static/queries-limit.html + case POSTGRES: + //https://dev.mysql.com/doc/refman/5.0/en/select.html + return "select " + noSelectsqlQuery + " limit " + numRows; + case ORACLE: + //newer versions (12c and later) support OFFSET/FETCH + return "select * from (select " + noSelectsqlQuery + ") where rownum <= " + numRows; + case SQLSERVER: + //newer versions (2012 and later) support OFFSET/FETCH + //https://msdn.microsoft.com/en-us/library/ms189463.aspx + return "select TOP(" + numRows + ") " + noSelectsqlQuery; + default: + String msg = "Unrecognized database product name <" + prod + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + /** + * This will find transactions that have timed out and abort them. + * Will also delete locks which are not associated with a transaction and have timed out + * Tries to keep transactions (against metastore db) small to reduce lock contention. + */ + public void performTimeOuts() { + Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { - stmt = dbConn.createStatement(); - // Abort any timed out locks from the table. - String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + - "' and txn_last_heartbeat < " + (now - timeout); - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - List<Long> deadTxns = new ArrayList<Long>(); - // Limit the number of timed out transactions we do in one pass to keep from generating a - // huge delete statement - do { - deadTxns.clear(); - for (int i = 0; i < TIMED_OUT_TXN_ABORT_BATCH_SIZE && rs.next(); i++) { - deadTxns.add(rs.getLong(1)); + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + long now = getDbTime(dbConn); + timeOutLocks(dbConn); + while(true) { + stmt = dbConn.createStatement(); + String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN + + "' and txn_last_heartbeat < " + (now - timeout); + s = addLimitClause(dbConn, 2500, s); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if(!rs.next()) { + return;//no more timedout txns } - // We don't care whether all of the transactions get deleted or not, - // if some didn't it most likely means someone else deleted them in the interum - if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); - } while (deadTxns.size() > 0); - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "abortTxn"); - throw new MetaException("Unable to update transaction database " - + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); + List<List<Long>> timedOutTxns = new ArrayList<>(); + List<Long> currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE); + timedOutTxns.add(currentBatch); + do { + currentBatch.add(rs.getLong(1)); + if(currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) { + currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE); + timedOutTxns.add(currentBatch); + } + } while(rs.next()); + close(rs, stmt, null); + dbConn.commit(); + for(List<Long> batchToAbort : timedOutTxns) { + abortTxns(dbConn, batchToAbort); + dbConn.commit(); + //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout' + LOG.info("Aborted the following transactions due to timeout: " + timedOutTxns.toString()); + } + int numTxnsAborted = (timedOutTxns.size() - 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE + + timedOutTxns.get(timedOutTxns.size() - 1).size(); + LOG.info("Aborted " + numTxnsAborted + " transactions due to timeout"); + } + } catch (SQLException ex) { + LOG.warn("Aborting timedout transactions failed due to " + getMessage(ex), ex); + } + catch(MetaException e) { + LOG.warn("Aborting timedout transactions failed due to " + e.getMessage(), e); + } + finally { + close(rs, stmt, dbConn); } } http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index f478184..6461435 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -946,7 +946,8 @@ public class TestTxnHandler { LockRequest req = new LockRequest(components, "me", "localhost"); LockResponse res = txnHandler.lock(req); assertTrue(res.getState() == LockState.ACQUIRED); - Thread.currentThread().sleep(10); + Thread.sleep(10); + txnHandler.performTimeOuts(); txnHandler.checkLock(new CheckLockRequest(res.getLockid())); fail("Told there was a lock, when it should have timed out."); } catch (NoSuchLockException e) { @@ -960,8 +961,8 @@ public class TestTxnHandler { long timeout = txnHandler.setTimeout(1); try { txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost")); - Thread.currentThread().sleep(10); - txnHandler.getOpenTxns(); + Thread.sleep(10); + txnHandler.performTimeOuts(); GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo(); int numAborted = 0; for (TxnInfo txnInfo : rsp.getOpen_txns()) { http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4678fe1..d034afc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -975,11 +975,19 @@ public class Driver implements CommandProcessor { return 10; } - boolean existingTxn = txnMgr.isTxnOpen(); + boolean initiatingTransaction = false; + boolean readOnlyQueryInAutoCommit = false; if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION || (!txnMgr.getAutoCommit() && startTxnImplicitly)) { + if(txnMgr.isTxnOpen()) { + throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId()); + } // We are writing to tables in an ACID compliant way, so we need to open a transaction txnMgr.openTxn(userFromUGI); + initiatingTransaction = true; + } + else { + readOnlyQueryInAutoCommit = txnMgr.getAutoCommit() && plan.getOperation() == HiveOperation.QUERY && !haveAcidWrite(); } // Set the transaction id in all of the acid file sinks if (haveAcidWrite()) { @@ -995,9 +1003,9 @@ public class Driver implements CommandProcessor { For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking in the lock manager.*/ txnMgr.acquireLocks(plan, ctx, userFromUGI); - if(!existingTxn) { + if(initiatingTransaction || readOnlyQueryInAutoCommit) { //For multi-stmt txns we should record the snapshot when txn starts but - // don't update it after that until txn completes. Thus the check for {@code existingTxn} + // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction} //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot //for each statement. recordValidTxns(); @@ -1279,6 +1287,7 @@ public class Driver implements CommandProcessor { } private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { + //console.printError(cpr.toString()); try { releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false); } http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java new file mode 100644 index 0000000..d22ca8d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HouseKeeperService; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Performs background tasks for Transaction management in Hive. + * Runs inside Hive Metastore Service. + */ +public class AcidHouseKeeperService implements HouseKeeperService { + private static final Log LOG = LogFactory.getLog(AcidHouseKeeperService.class); + private ScheduledExecutorService pool = null; + private AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE); + @Override + public void start(HiveConf hiveConf) throws Exception { + HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + if(!mgr.supportsAcid()) { + LOG.info(AcidHouseKeeperService.class.getName() + " not started since " + + mgr.getClass().getName() + " does not support Acid."); + return;//there are no transactions in this case + } + pool = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final AtomicInteger threadCounter = new AtomicInteger(); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "DeadTxnReaper-" + threadCounter.getAndIncrement()); + } + }); + TimeUnit tu = TimeUnit.MILLISECONDS; + pool.scheduleAtFixedRate(new TimedoutTxnReaper(hiveConf, this), + hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu), + hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu), + TimeUnit.MILLISECONDS); + LOG.info("Started " + this.getClass().getName() + " with delay/interval = " + + hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu) + "/" + + hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu) + " " + tu); + } + @Override + public void stop() { + if(pool != null && !pool.isShutdown()) { + pool.shutdown(); + } + pool = null; + } + @Override + public String getServiceDescription() { + return "Abort expired transactions"; + } + private static final class TimedoutTxnReaper implements Runnable { + private final TxnHandler txnHandler; + private final AcidHouseKeeperService owner; + private TimedoutTxnReaper(HiveConf hiveConf, AcidHouseKeeperService owner) { + txnHandler = new TxnHandler(hiveConf); + this.owner = owner; + } + @Override + public void run() { + try { + txnHandler.performTimeOuts(); + owner.isAliveCounter.incrementAndGet(); + LOG.info("timeout reaper ran"); + } + catch(Throwable t) { + LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t); + } + } + } + + /** + * This is used for testing only. Each time the housekeeper runs, counter is incremented by 1. + * Starts with {@link java.lang.Integer#MIN_VALUE} + */ + public int getIsAliveCounter() { + return isAliveCounter.get(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 6c77ba4..8e431b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -52,6 +53,7 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCommitter; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; import org.apache.hadoop.util.StringUtils; @@ -183,7 +185,9 @@ public class CompactorMR { LOG.debug("Setting minimum transaction to " + minTxn); LOG.debug("Setting maximume transaction to " + maxTxn); - JobClient.runJob(job).waitForCompletion(); + RunningJob rj = JobClient.runJob(job); + LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" + jobName + "' with jobID=" + rj.getID()); + rj.waitForCompletion(); su.gatherStats(); } http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 32a9ef8..73715c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * A class to initiate compactions. This will run in a separate thread. + * It's critical that there exactly 1 of these in a given warehouse. */ public class Initiator extends CompactorThread { static final private String CLASS_NAME = Initiator.class.getName(); http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index e164661..0548117 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -264,7 +264,7 @@ public class Worker extends CompactorThread { sb.append(colName).append(","); } sb.setLength(sb.length() - 1);//remove trailing , - LOG.debug("running '" + sb.toString() + "'"); + LOG.info("running '" + sb.toString() + "'"); Driver d = new Driver(conf, userName); SessionState localSession = null; if(SessionState.get() == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index c73621f..e13e6eb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -9,6 +9,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.orc.FileDump; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -24,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.concurrent.TimeUnit; /** * The LockManager is not ready, but for no-concurrency straight-line path we can @@ -388,6 +390,25 @@ public class TestTxnCommands { int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}}; Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs); } + @Test + public void testTimeOutReaper() throws Exception { + runStatementOnDriver("set autocommit false"); + runStatementOnDriver("start transaction"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5"); + //make sure currently running txn is considered aborted by housekeeper + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS); + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS); + AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService(); + //this will abort the txn + houseKeeperService.start(hiveConf); + while(houseKeeperService.getIsAliveCounter() <= Integer.MIN_VALUE) { + Thread.sleep(100);//make sure it has run at least once + } + houseKeeperService.stop(); + //this should fail because txn aborted due to timeout + CommandProcessorResponse cpr = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5"); + Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1")); + } /** * takes raw data and turns it into a string as if from Driver.getResults() http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 57e4fb9..58c2fca 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -131,6 +131,7 @@ public class TestTxnCommands2 { hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD int[][] tableData = {{1,2},{3,4}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b"); runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); Worker t = new Worker(); t.setThreadId((int) t.getId()); http://git-wip-us.apache.org/repos/asf/hive/blob/73870911/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index f57350d..db119e1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -29,10 +29,10 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -49,6 +49,7 @@ public class TestDbTxnManager { private HiveConf conf = new HiveConf(); private HiveTxnManager txnMgr; + private AcidHouseKeeperService houseKeeperService = null; private Context ctx; private int nextInput; private int nextOutput; @@ -56,7 +57,6 @@ public class TestDbTxnManager { HashSet<WriteEntity> writeEntities; public TestDbTxnManager() throws Exception { - conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 500, TimeUnit.MILLISECONDS); TxnDbUtil.setConfValues(conf); SessionState.start(conf); ctx = new Context(conf); @@ -179,14 +179,30 @@ public class TestDbTxnManager { locks = txnMgr.getLockManager().getLocks(false, false); Assert.assertEquals(0, locks.size()); } + + /** + * aborts timed out transactions + */ + private void runReaper() throws Exception { + int lastCount = houseKeeperService.getIsAliveCounter(); + houseKeeperService.start(conf); + while(houseKeeperService.getIsAliveCounter() <= lastCount) { + try { + Thread.sleep(100);//make sure it has run at least once + } + catch(InterruptedException ex) { + //... + } + } + houseKeeperService.stop(); + } @Test public void testExceptions() throws Exception { WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); txnMgr.acquireLocks(qp, ctx, "PeterI"); txnMgr.openTxn("NicholasII"); - Thread.sleep(1000);//let txn timeout - txnMgr.getValidTxns(); + runReaper(); LockException exception = null; try { txnMgr.commitTxn(); @@ -198,8 +214,7 @@ public class TestDbTxnManager { Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); exception = null; txnMgr.openTxn("AlexanderIII"); - Thread.sleep(1000); - txnMgr.getValidTxns(); + runReaper(); try { txnMgr.rollbackTxn(); } @@ -213,8 +228,7 @@ public class TestDbTxnManager { txnMgr.acquireLocks(qp, ctx, "PeterI"); List<HiveLock> locks = ctx.getHiveLocks(); Assert.assertThat("Unexpected lock count", locks.size(), is(1)); - Thread.sleep(1000); - txnMgr.getValidTxns(); + runReaper(); try { txnMgr.heartbeat(); } @@ -341,7 +355,6 @@ public class TestDbTxnManager { Assert.assertTrue(sawException); } - @Before public void setUp() throws Exception { TxnDbUtil.prepDb(); @@ -351,10 +364,14 @@ public class TestDbTxnManager { nextOutput = 1; readEntities = new HashSet<ReadEntity>(); writeEntities = new HashSet<WriteEntity>(); + conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS); + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS); + houseKeeperService = new AcidHouseKeeperService(); } @After public void tearDown() throws Exception { + if(houseKeeperService != null) houseKeeperService.stop(); if (txnMgr != null) txnMgr.closeTxnManager(); TxnDbUtil.cleanDb(); }