Repository: hive Updated Branches: refs/heads/branch-2.1 3eb16ebec -> 45c1775e1
HIVE-13833 : Add an initial delay when starting the heartbeat (Wei Zheng, reviewed by Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/45c1775e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/45c1775e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/45c1775e Branch: refs/heads/branch-2.1 Commit: 45c1775e1b32234d576e7a474a372d9fa9326053 Parents: 3eb16eb Author: Wei Zheng <w...@apache.org> Authored: Tue Jun 14 15:30:56 2016 -0700 Committer: Wei Zheng <w...@apache.org> Committed: Wed Jun 15 10:27:48 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 45 ++++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/45c1775e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 9988eec..5b6f20c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.lockmgr; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +83,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { private static ScheduledExecutorService heartbeatExecutorService = null; private ScheduledFuture<?> heartbeatTask = null; private Runnable shutdownRunner = null; - static final int SHUTDOWN_HOOK_PRIORITY = 0; + private static final int SHUTDOWN_HOOK_PRIORITY = 0; DbTxnManager() { shutdownRunner = new Runnable() { @@ -161,10 +160,11 @@ public class DbTxnManager extends HiveTxnManagerImpl { getLockManager(); boolean atLeastOneLock = false; + queryId = plan.getQueryId(); - LockRequestBuilder rqstBuilder = new LockRequestBuilder(plan.getQueryId()); + LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId); //link queryId to txnId - LOG.info("Setting lock request transaction to " + JavaUtils.txnIdToString(txnId) + " for queryId=" + plan.getQueryId()); + LOG.info("Setting lock request transaction to " + JavaUtils.txnIdToString(txnId) + " for queryId=" + queryId); rqstBuilder.setTransactionId(txnId) .setUser(username); @@ -304,7 +304,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { // Make sure we need locks. It's possible there's nothing to lock in // this operation. if (!atLeastOneLock) { - LOG.debug("No locks needed for queryId" + plan.getQueryId()); + LOG.debug("No locks needed for queryId" + queryId); return null; } @@ -312,7 +312,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { if(isTxnOpen()) { statementId++; } - LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks); + LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks); ctx.setHiveLocks(locks); return lockState; } @@ -324,15 +324,13 @@ public class DbTxnManager extends HiveTxnManagerImpl { return t; } /** - * This is for testing only. + * This is for testing only. Normally client should call {@link #acquireLocks(QueryPlan, Context, String, boolean)} * @param delay time to delay for first heartbeat - * @return null if no locks were needed */ @VisibleForTesting void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { acquireLocks(plan, ctx, username, true); ctx.setHeartbeater(startHeartbeat(delay)); - queryId = plan.getQueryId(); } @@ -439,24 +437,25 @@ public class DbTxnManager extends HiveTxnManagerImpl { } } - private Heartbeater startHeartbeat() throws LockException { - return startHeartbeat(0); - } - /** - * This is for testing only. Normally client should call {@link #startHeartbeat()} - * Make the heartbeater start before an initial delay period. - * @param delay time to delay before first execution, in milliseconds - * @return heartbeater + * Start the heartbeater threadpool and return the task. + * @param initialDelay time to delay before first execution, in milliseconds + * @return heartbeater */ - Heartbeater startHeartbeat(long delay) throws LockException { + private Heartbeater startHeartbeat(long initialDelay) throws LockException { long heartbeatInterval = getHeartbeatInterval(conf); assert heartbeatInterval > 0; Heartbeater heartbeater = new Heartbeater(this, conf); + // For negative testing purpose.. + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { + initialDelay = 0; + } else if (initialDelay == 0) { + initialDelay = heartbeatInterval; + } heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate( - heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS); - LOG.info("Started heartbeat with delay/interval = " + 0 + "/" + heartbeatInterval + " " + - TimeUnit.MILLISECONDS + " for query: " + queryId); + heartbeater, initialDelay, heartbeatInterval, TimeUnit.MILLISECONDS); + LOG.info("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval + + " " + TimeUnit.MILLISECONDS + " for query: " + queryId); return heartbeater; } @@ -584,7 +583,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { return statementId; } - public static long getHeartbeatInterval(Configuration conf) throws LockException { + private static long getHeartbeatInterval(Configuration conf) throws LockException { // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS), // then divide it by 2 to give us a safety factor. long interval = @@ -612,7 +611,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { * * @param txnMgr transaction manager for this operation */ - public Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { + Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { this.txnMgr = txnMgr; this.conf = conf; lockException = null;