HIVE-18076 : killquery doesn't actually work for non-trigger WM kills (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6760b01 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6760b01 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6760b01 Branch: refs/heads/master Commit: b6760b017cf726a82d727d2970cdcfc0e7eebcb2 Parents: 3500196 Author: sergey <ser...@apache.org> Authored: Wed Nov 29 12:17:01 2017 -0800 Committer: sergey <ser...@apache.org> Committed: Wed Nov 29 12:17:01 2017 -0800 ---------------------------------------------------------------------- .../TestLlapTaskSchedulerService.java | 1 - .../hadoop/hive/ql/exec/tez/TezSessionPool.java | 10 ++- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 11 ++- .../hadoop/hive/ql/exec/tez/WmTezSession.java | 1 - .../hive/ql/exec/tez/WorkloadManager.java | 77 +++++++++++--------- 5 files changed, 58 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 5460248..90b31e4 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -429,7 +429,6 @@ public class TestLlapTaskSchedulerService { TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1), ti2 = tsWrapper.ts.getTaskInfo(task2); assertFalse(ti1.isGuaranteed() || ti2.isGuaranteed()); - // TODO# ts.notifyStarted(task); tsWrapper.ts.updateGuaranteedCount(1); tsWrapper.ts.waitForMessagesSent(1); TaskInfo tiHigher = ti1.isGuaranteed() ? ti1 : ti2, tiLower = (tiHigher == ti1) ? ti2 : ti1; http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 4d2cf88..3bcf657 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -107,6 +107,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { int threadCount = Math.min(initialSize, HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)); Preconditions.checkArgument(threadCount > 0); + this.parentSessionState = SessionState.get(); if (threadCount == 1) { for (int i = 0; i < initialSize; ++i) { SessionType session = sessionObjFactory.create(null); @@ -115,7 +116,6 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { } } else { final AtomicInteger remaining = new AtomicInteger(initialSize); - this.parentSessionState = SessionState.get(); @SuppressWarnings("unchecked") FutureTask<Boolean>[] threadTasks = new FutureTask[threadCount]; for (int i = threadTasks.length - 1; i >= 0; --i) { @@ -272,6 +272,14 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { // The caller probably created the new session with the old config, but update the // registry again just in case. TODO: maybe we should enforce that. configureAmRegistry(newSession); + if (SessionState.get() == null && parentSessionState != null) { + // Tez session relies on a threadlocal for open... If we are on some non-session thread, + // just use the same SessionState we used for the initial sessions. + // Technically, given that all pool sessions are initially based on this state, shoudln't + // we also set this at all times and not rely on an external session stuff? We should + // probably just get rid of the thread local usage in TezSessionState. + SessionState.setCurrentSessionState(parentSessionState); + } newSession.open(additionalFiles, scratchDir); if (!putSessionBack(newSession, false)) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 8087b01..af77f30 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -171,9 +171,10 @@ public class TezTask extends Task<TezWork> { TriggerContext triggerContext = ctx.getTriggerContext(); triggerContext.setDesiredCounters(desiredCounters); - session.setTriggerContext(triggerContext); - LOG.info("Subscribed to counters: {} for queryId: {}", desiredCounters, triggerContext.getQueryId()); + LOG.info("Subscribed to counters: {} for queryId: {}", + desiredCounters, triggerContext.getQueryId()); ss.setTezSession(session); + session.setTriggerContext(triggerContext); try { // jobConf will hold all the configuration for hadoop, tez, and hive JobConf jobConf = utils.createConfiguration(conf); @@ -190,8 +191,7 @@ public class TezTask extends Task<TezWork> { // This is used to compare global and vertex resources. Global resources are originally // derived from session conf via localizeTempFilesFromConf. So, use that here. - Configuration sessionConf = - (session != null && session.getConf() != null) ? session.getConf() : conf; + Configuration sessionConf = (session.getConf() != null) ? session.getConf() : conf; Map<String,LocalResource> inputOutputLocalResources = getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf); @@ -584,7 +584,10 @@ public class TezTask extends Task<TezWork> { try { console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: " + Arrays.toString(e.getStackTrace()) + " retrying..."); + // TODO: this is temporary, need to refactor how reopen is invoked. + TriggerContext oldCtx = sessionState.getTriggerContext(); sessionState = sessionState.reopen(conf, inputOutputJars); + sessionState.setTriggerContext(oldCtx); dagClient = sessionState.getSession().submitDAG(dag); } catch (Exception retryException) { // we failed to submit after retrying. Destroy session and bail. http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index 96d70c9..d61c531 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -143,7 +143,6 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode void clearWm() { this.poolName = null; this.clusterFraction = 0f; - this.queryId = null; } double getClusterFraction() { http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index a8360bd..388a4f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -1,5 +1,4 @@ /** -/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -396,18 +395,22 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // query is being killed until both the kill, and the user, return it. String queryId = toKill.getQueryId(); KillQuery kq = toKill.getKillQuery(); - if (kq != null && queryId != null) { - LOG.info("Invoking KillQuery for " + queryId + ": " + reason); - try { - kq.killQuery(queryId, reason); - addKillQueryResult(toKill, true); - LOG.debug("Killed " + queryId); - return; - } catch (HiveException ex) { - LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex); + try { + if (kq != null && queryId != null) { + LOG.info("Invoking KillQuery for " + queryId + ": " + reason); + try { + kq.killQuery(queryId, reason); + addKillQueryResult(toKill, true); + LOG.debug("Killed " + queryId); + return; + } catch (HiveException ex) { + LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex); + } + } else { + LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq); } - } else { - LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq); + } finally { + toKill.setQueryId(null); } // We cannot restart in place because the user might receive a failure and return the // session to the master thread without the "irrelevant" flag set. In fact, the query might @@ -421,12 +424,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // 2. Restart pool sessions. for (final WmTezSession toRestart : context.toRestartInUse) { LOG.info("Replacing {} with a new session", toRestart); + toRestart.setQueryId(null); workPool.submit(() -> { try { // Note: sessions in toRestart are always in use, so they cannot expire in parallel. tezAmPool.replaceSession(toRestart, false, null); } catch (Exception ex) { - LOG.error("Failed to restart an old session; ignoring " + ex.getMessage()); + LOG.error("Failed to restart an old session; ignoring", ex); } }); } @@ -564,7 +568,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long // as possible for (MoveSession moveSession : e.moveSessions) { - handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute); + handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse); } e.moveSessions.clear(); @@ -676,9 +680,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } - private void handleMoveSessionOnMasterThread(final MoveSession moveSession, - final WmThreadSyncWork syncWork, - final HashSet<String> poolsToRedistribute) { + private void handleMoveSessionOnMasterThread(MoveSession moveSession, WmThreadSyncWork syncWork, + Set<String> poolsToRedistribute, Map<WmTezSession, GetRequest> toReuse) { String destPoolName = moveSession.destPool; LOG.info("Handling move session event: {}", moveSession); if (validMove(moveSession.srcSession, destPoolName)) { @@ -689,7 +692,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // check if there is capacity in dest pool, if so move else kill the session if (capacityAvailable(destPoolName)) { // add to destination pool - Boolean added = checkAndAddSessionToAnotherPool(moveSession.srcSession, destPoolName, poolsToRedistribute); + Boolean added = checkAndAddSessionToAnotherPool( + moveSession.srcSession, destPoolName, poolsToRedistribute); if (added != null && added) { moveSession.future.set(true); return; @@ -697,10 +701,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession); } } else { - moveSession.srcSession.clearWm(); - moveSession.srcSession.setIsIrrelevantForWm("Destination pool " - + destPoolName + " is full. Killing query."); - syncWork.toRestartInUse.add(moveSession.srcSession); + WmTezSession session = moveSession.srcSession; + resetRemovedSessionToKill(session, toReuse); + syncWork.toKillQuery.put(session, new KillQueryContext(session, "Destination pool " + + destPoolName + " is full. Killing query.")); } } else { LOG.error("Failed to move session: {}. Session is not removed from its pool.", moveSession); @@ -785,6 +789,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (reuseRequest != null) { reuseRequest.future.setException(new AssertionError("Invalid reuse attempt")); } + session.setQueryId(null); return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn); } @@ -1086,6 +1091,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida WmTezSession session = req.sessionToReuse; if (session == null) return; req.sessionToReuse = null; + session.setQueryId(null); if (poolsToRedistribute != null) { RemoveSessionResult rr = checkAndRemoveSessionFromItsPool( session, poolsToRedistribute, true); @@ -1119,7 +1125,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida * in WM but wasn't found in the requisite pool (internal error?). */ private RemoveSessionResult checkAndRemoveSessionFromItsPool( - WmTezSession session, HashSet<String> poolsToRedistribute, Boolean isSessionOk) { + WmTezSession session, Set<String> poolsToRedistribute, Boolean isSessionOk) { // It is possible for some request to be queued after a main thread has decided to kill this // session; on the next iteration, we'd be processing that request with an irrelevant session. if (session.isIrrelevantForWm()) { @@ -1146,7 +1152,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } private Boolean checkAndAddSessionToAnotherPool( - WmTezSession session, String destPoolName, HashSet<String> poolsToRedistribute) { + WmTezSession session, String destPoolName, Set<String> poolsToRedistribute) { if (session.isIrrelevantForWm()) { // This is called only during move session handling, removing session already checks this. // So this is not expected as remove failing will not even invoke this method @@ -1624,7 +1630,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida IdentityHashMap<WmTezSession, GetRequest> toReuse, Map<WmTezSession, KillQueryContext> toKill) { for (WmTezSession sessionToKill : sessions) { - resetRemovedSession(sessionToKill, toReuse); + resetRemovedSessionToKill(sessionToKill, toReuse); toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason)); } sessions.clear(); @@ -1635,21 +1641,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (sessionToKill == null) { continue; // Async op in progress; the callback will take care of this. } - resetRemovedSession(sessionToKill, toReuse); + resetRemovedSessionToKill(sessionToKill, toReuse); toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason)); } initializingSessions.clear(); } - private void resetRemovedSession(WmTezSession sessionToKill, - IdentityHashMap<WmTezSession, GetRequest> toReuse) { - sessionToKill.clearWm(); - GetRequest req = toReuse.remove(sessionToKill); - if (req != null) { - req.sessionToReuse = null; - } - } - public void setTriggers(final LinkedList<Trigger> triggers) { this.triggers = triggers; } @@ -1786,6 +1783,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } if (session != null) { session.clearWm(); + session.setQueryId(null); // We can just restart the session if we have received one. try { tezAmPool.replaceSession(session, false, null); @@ -1914,6 +1912,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } + private static void resetRemovedSessionToKill( + WmTezSession sessionToKill, Map<WmTezSession, GetRequest> toReuse) { + sessionToKill.clearWm(); + GetRequest req = toReuse.remove(sessionToKill); + if (req != null) { + req.sessionToReuse = null; + } + } + @VisibleForTesting TezSessionPool<WmTezSession> getTezAmPool() { return tezAmPool;