Repository: hive Updated Branches: refs/heads/master 52e1ba15d -> 0538e5102
HIVE-17473 : implement workload management pools (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0538e510 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0538e510 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0538e510 Branch: refs/heads/master Commit: 0538e510232c7c685621c2608f5aa729f2afc2eb Parents: 52e1ba1 Author: sergey <ser...@apache.org> Authored: Thu Oct 19 17:45:12 2017 -0700 Committer: sergey <ser...@apache.org> Committed: Thu Oct 19 17:45:12 2017 -0700 ---------------------------------------------------------------------- .../hive/jdbc/TestTriggersWorkloadManager.java | 2 +- .../hadoop/hive/ql/exec/tez/TezSessionPool.java | 8 +- .../hive/ql/exec/tez/UserPoolMapping.java | 92 ++++++ .../hive/ql/exec/tez/WorkloadManager.java | 329 ++++++++++++++++--- .../hive/ql/exec/tez/TestWorkloadManager.java | 259 ++++++++++++++- .../apache/hive/service/server/HiveServer2.java | 11 +- 6 files changed, 641 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java index 8485ba6..8d7693d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -73,7 +73,7 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag @Override protected void setupTriggers(final List<Trigger> triggers) throws Exception { WorkloadManager wm = WorkloadManager.getInstance(); - WorkloadManager.PoolState poolState = spy(new WorkloadManager.PoolState()); + WorkloadManager.PoolState poolState = spy(new WorkloadManager.PoolState("llap", 1, 1f)); when(poolState.getTriggers()).thenReturn(triggers); wm.getPools().put("llap", poolState); } http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 4f2997b..b67c933 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -50,6 +50,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { new ConcurrentLinkedQueue<SessionType>(); private final HiveConf initConf; + private int initialSize; // TODO: eventually, this will need to support resize. That would probably require replacement // with a RW lock, a semaphore and linked list. @@ -71,6 +72,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { } void startInitialSessions() throws Exception { + initialSize = initialSessions.size(); if (initialSessions.isEmpty()) return; if (amRegistry != null) { amRegistry.start(); @@ -240,4 +242,8 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> { bySessionId.remove(sessionId); } } -} \ No newline at end of file + + int getInitialSize() { + return initialSize; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java new file mode 100644 index 0000000..81d6b85 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping; + +class UserPoolMapping { + private final static class Mapping { + public Mapping(String poolName, int priority) { + this.fullPoolName = poolName; + this.priority = priority; + } + int priority; + String fullPoolName; + @Override + public String toString() { + return "[" + fullPoolName + ", priority=" + priority + "]"; + } + } + + private final Map<String, Mapping> userMappings = new HashMap<>(); + private final String defaultPoolName; + // TODO: add other types as needed + + public UserPoolMapping(List<TmpUserMapping> mappings, Set<String> poolNames) { + String defaultPoolName = null; + for (TmpUserMapping mapping : mappings) { + switch (mapping.getType()) { + case USER: { + String poolName = getValidPoolName(poolNames, mapping); + Mapping val = new Mapping(poolName, mapping.getPriority()); + Mapping oldValue = userMappings.put(mapping.getName(), val); + if (oldValue != null) { + throw new AssertionError("Duplicate mapping for user " + mapping.getName() + "; " + + oldValue + " and " + val); + } + break; + } + case DEFAULT: { + String poolName = getValidPoolName(poolNames, mapping); + if (defaultPoolName != null) { + throw new AssertionError("Duplicate default mapping; " + + defaultPoolName + " and " + poolName); + } + defaultPoolName = poolName; + break; + } + default: throw new AssertionError("Unknown type " + mapping.getType()); + } + } + this.defaultPoolName = defaultPoolName; + } + + public String mapSessionToPoolName(String userName) { + // For now, we only have user rules, so this is very simple. + // In future we'd also look up groups (each groups the user is in initially; we may do it + // the opposite way if the user is a member of many groups but there are not many rules), + // whatever user supplies in connection string to HS2, etc. + // If multiple rules match, we'd need to get the highest priority one. + Mapping userMapping = userMappings.get(userName); + if (userMapping != null) return userMapping.fullPoolName; + return defaultPoolName; + } + + private static String getValidPoolName(Set<String> poolNames, TmpUserMapping mapping) { + String poolName = mapping.getPoolName(); + // Should we really be validating here? The plan should be validated before applying. + if (!poolNames.contains(mapping.getPoolName())) { + throw new AssertionError("Invalid pool in the mapping " + poolName); + } + return poolName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 54c7f74..d725e90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import com.google.common.annotations.VisibleForTesting; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -44,8 +48,6 @@ import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - /** Workload management entry point for HS2. */ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator @@ -60,11 +62,21 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private final RestrictedConfigChecker restrictedConfig; private final QueryAllocationManager allocationManager; private final String yarnQueue; + // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool + // sessions, so the pool itself could internally track the sessions it gave out, since + // calling close on an unopened session is probably harmless. + private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions = + new IdentityHashMap<>(); + private final int amRegistryTimeoutMs; + + /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */ private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock(); - private final List<TezSessionPoolSession> openSessions = new LinkedList<>(); private final Map<String, PoolState> pools = new HashMap<>(); - private final int amRegistryTimeoutMs; + // Used to make sure that waiting getSessions don't block update. + private int internalPoolsVersion; + private UserPoolMapping userPoolMapping; + private SessionTriggerProvider sessionTriggerProvider; private TriggerActionHandler triggerActionHandler; private TriggerValidatorRunnable triggerValidatorRunnable; @@ -73,8 +85,31 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // Add stuff here as WM is implemented. private final Object lock = new Object(); private final List<WmTezSession> sessions = new ArrayList<>(); + private final Semaphore sessionsClaimed; + + private final String fullName; + private final double finalFraction; + private double finalFractionRemaining; + private final int queryParallelism; private final List<Trigger> triggers = new ArrayList<>(); + public PoolState(String fullName, int queryParallelism, double fraction) { + this.fullName = fullName; + this.queryParallelism = queryParallelism; + // A fair semaphore to ensure correct queue order. + this.sessionsClaimed = new Semaphore(queryParallelism, true); + this.finalFraction = this.finalFractionRemaining = fraction; + } + + @Override + public String toString() { + return "[" + fullName + ", query parallelism " + queryParallelism + + ", fraction of the cluster " + finalFraction + ", fraction used by child pools " + + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size() + + "]"; + } + + public List<Trigger> getTriggers() { return triggers; } @@ -93,14 +128,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } /** Called once, when HS2 initializes. */ - public static WorkloadManager create(String yarnQueue, HiveConf conf) { + public static WorkloadManager create(String yarnQueue, HiveConf conf, TmpResourcePlan plan) { assert INSTANCE == null; Token<JobTokenIdentifier> amsToken = createAmsToken(); // We could derive the expected number of AMs to pass in. LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, amsToken, -1); QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm); - // TODO: Hardcode one session for now; initial policies should be passed in. - return (INSTANCE = new WorkloadManager(yarnQueue, conf, 1, qam, amsToken)); + return (INSTANCE = new WorkloadManager(yarnQueue, conf, qam, amsToken, plan)); } private static Token<JobTokenIdentifier> createAmsToken() { @@ -116,11 +150,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } @VisibleForTesting - WorkloadManager(String yarnQueue, HiveConf conf, int numSessions, - QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken) { + WorkloadManager(String yarnQueue, HiveConf conf, + QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken, TmpResourcePlan plan) { this.yarnQueue = yarnQueue; this.conf = conf; - initializeHivePools(); + int numSessions = initializeHivePools(plan); + LOG.info("Initializing with " + numSessions + " total query parallelism"); + this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); sessions = new TezSessionPool<>(conf, numSessions, true); @@ -138,32 +174,149 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida startTriggerValidator(conf); } - private void initializeHivePools() { - // TODO: real implementation + private int initializeHivePools(TmpResourcePlan plan) { poolsLock.writeLock().lock(); try { // FIXME: Add Triggers from metastore to poolstate - pools.put("llap", new PoolState()); + // Note: we assume here that plan has been validated beforehand, so we don't verify + // that fractions or query parallelism add up. + int totalQueryParallelism = 0; + // Use recursion to update parents more conveniently; we don't expect a big tree. + for (TmpHivePool pool : plan.getRootPools()) { + totalQueryParallelism += addHivePool(pool, null); + } + this.userPoolMapping = new UserPoolMapping(plan.getMappings(), pools.keySet()); + internalPoolsVersion = 0; // Initializing for the first time. + return totalQueryParallelism; } finally { poolsLock.writeLock().unlock(); } } + private final static char POOL_SEPARATOR = '/'; + private int addHivePool(TmpHivePool pool, PoolState parent) { + String fullName = pool.getName(); + int totalQueryParallelism = pool.getQueryParallelism(); + double fraction = pool.getResourceFraction(); + if (parent != null) { + fullName = parent.fullName + POOL_SEPARATOR + fullName; + fraction = parent.finalFraction * pool.getResourceFraction(); + parent.finalFractionRemaining -= fraction; + } + PoolState state = new PoolState(fullName, totalQueryParallelism, fraction); + if (pool.getChildren() != null) { + for (TmpHivePool child : pool.getChildren()) { + totalQueryParallelism += addHivePool(child, state); + } + } + LOG.info("Adding Hive pool: " + state); + pools.put(fullName, state); + return totalQueryParallelism; + } + public TezSessionState getSession( TezSessionState session, String userName, HiveConf conf) throws Exception { validateConfig(conf); - String poolName = mapSessionToPoolName(userName); - // TODO: do query parallelism enforcement here based on the policies and pools. - while (true) { - WmTezSession result = checkSessionForReuse(session); - // TODO: when proper AM management is implemented, we should call tryGet... here, because the - // parallelism will be enforced here, and pool would always have a session for us. - result = (result == null ? sessions.getSession() : result); - result.setQueueName(yarnQueue); - result.setPoolName(poolName); - if (!ensureAmIsRegistered(result)) continue; // Try another. - redistributePoolAllocations(poolName, result, null); - return result; + WmTezSession result = checkSessionForReuse(session); + boolean hasAcquired = false; + String poolName = null; + while (!hasAcquired) { // This loop handles concurrent plan updates while we are waiting. + poolName = userPoolMapping.mapSessionToPoolName(userName); + if (poolName == null) { + throw new HiveException("Cannot find any pool mapping for user " + userName); + } + int internalVersion = -1; + Semaphore sessionsClaimed = null; + poolsLock.readLock().lock(); + try { + PoolState pool = pools.get(poolName); + if (pool == null) throw new AssertionError("Pool " + poolName + " not found."); + // No need to take the pool lock, semaphore is final. + sessionsClaimed = pool.sessionsClaimed; + internalVersion = internalPoolsVersion; + } finally { + poolsLock.readLock().unlock(); + } + // One cannot simply reuse the session if there are other queries waiting; to maintain + // fairness, we'll try to take the semaphore instantly, and if that fails we'll return + // this session back to the pool and potentially give the user a new session later. + if (result != null) { + // Handle the special case; the pool may be exactly at capacity w/o queue. In that + // case, we still should be able to reuse. + boolean isFromTheSamePool = false; + String oldPoolName = result.getPoolName(); + if (poolName.equals(oldPoolName)) { + sessionsClaimed.release(); + isFromTheSamePool = true; + } + // Note: we call timed acquire because untimed one ignores fairness. + hasAcquired = sessionsClaimed.tryAcquire(1, TimeUnit.MILLISECONDS); + if (hasAcquired) { + poolsLock.readLock().lock(); + boolean doUnlock = true; + try { + if (internalVersion == internalPoolsVersion) { + if (!isFromTheSamePool) { + // Free up the usage in the old pool. TODO: ideally not under lock; not critical. + redistributePoolAllocations(oldPoolName, null, result, true); + } + doUnlock = false; // Do not unlock; see below. + break; + } + } finally { + if (doUnlock) { + poolsLock.readLock().unlock(); + } + } + hasAcquired = false; + } + // Note: we are short-circuiting session::returnToSessionManager to supply the flag + returnAfterUse(result, !isFromTheSamePool); + result = null; + } + // We don't expect frequent updates, so check every second. + while (!(hasAcquired = (hasAcquired || sessionsClaimed.tryAcquire(1, TimeUnit.SECONDS)))) { + poolsLock.readLock().lock(); + try { + if (internalVersion != internalPoolsVersion) break; + } finally { + poolsLock.readLock().unlock(); + } + } + if (!hasAcquired) continue; + // Keep it simple for now - everything between acquiring the semaphore and adding the session + // to the pool state is done under read lock, blocking pool updates. It's possible to make + // it more granular if needed. The only potentially lengthy operation is waiting for an + // expired session to be restarted in the session pool. + poolsLock.readLock().lock(); + if (internalVersion == internalPoolsVersion) break; + poolsLock.readLock().unlock(); + hasAcquired = false; + } + // We are holding the lock from the end of the loop. + try { + assert hasAcquired; + while (true) { + // TODO: ideally, we'd need to implement tryGet and deal with the valid wait from a session + // restarting somehow, as opposed to the invalid case of a session missing from the + // pool due to some bug. Keep a "restarting" counter in the pool? + boolean isFromTheSamePool = false; + if (result == null) { + result = sessions.getSession(); + } else { + // If we are just reusing the session from the same pool, do not adjust allocations. + isFromTheSamePool = poolName.equals(result.getPoolName()); + } + result.setQueueName(yarnQueue); + result.setPoolName(poolName); + if (!ensureAmIsRegistered(result)) continue; // Try another. + if (!isFromTheSamePool) { + redistributePoolAllocations(poolName, result, null, false); + } + return result; + } + } finally { + poolsLock.readLock().unlock(); } } @@ -181,12 +334,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } private void redistributePoolAllocations( - String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove) { + String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove, + boolean releaseParallelism) { List<WmTezSession> sessionsToUpdate = null; double totalAlloc = 0; assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName()); assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName()); poolsLock.readLock().lock(); + boolean hasRemoveFailed = false; try { PoolState pool = pools.get(poolName); synchronized (pool.lock) { @@ -195,12 +350,21 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida pool.sessions.add(sessionToAdd); } if (sessionToRemove != null) { + // TODO: this assumes that the update process will take the write lock, and make + // everything right w.r.t. semaphores, pool names and other stuff, since we might + // be releasing a different semaphore from the one we acquired if it's across + // the update. If the magic in the update is weak, this may become more involved. if (!pool.sessions.remove(sessionToRemove)) { LOG.error("Session " + sessionToRemove + " could not be removed from the pool"); + if (releaseParallelism) { + hasRemoveFailed = true; + } + } else if (releaseParallelism) { + pool.sessionsClaimed.release(); } sessionToRemove.setClusterFraction(0); } - totalAlloc = updatePoolAllocations(pool.sessions); + totalAlloc = updatePoolAllocations(pool.sessions, pool.finalFractionRemaining); sessionsToUpdate = new ArrayList<>(pool.sessions); } } finally { @@ -208,6 +372,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); updateSessionsTriggers(); + if (hasRemoveFailed) { + throw new AssertionError("Cannot remove the session from the pool and release " + + "the query slot; HS2 may fail to accept queries"); + } } private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception { @@ -232,18 +400,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida return null; } - private double updatePoolAllocations(List<WmTezSession> sessions) { + private double updatePoolAllocations(List<WmTezSession> sessions, double totalFraction) { // TODO: real implementation involving in-the-pool policy interface, etc. - double allocation = 1.0 / sessions.size(); + double allocation = totalFraction / sessions.size(); for (WmTezSession session : sessions) { session.setClusterFraction(allocation); } - return 1.0; - } - - private String mapSessionToPoolName(String userName) { - // TODO: real implementation, probably calling into another class initialized with policies. - return "llap"; + return totalFraction; } private void validateConfig(HiveConf conf) throws HiveException { @@ -272,7 +435,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida public void stop() throws Exception { List<TezSessionPoolSession> sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList<TezSessionPoolSession>(openSessions); + sessionsToClose = new ArrayList<>(openSessions.keySet()); } for (TezSessionPoolSession sessionState : sessionsToClose) { @@ -302,10 +465,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida @Override public void returnAfterUse(TezSessionPoolSession session) throws Exception { + returnAfterUse(session, true); + } + + private void returnAfterUse( + TezSessionPoolSession session, boolean releaseParallelism) throws Exception { boolean isInterrupted = Thread.interrupted(); try { WmTezSession wmSession = ensureOwnedSession(session); - redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); + redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, releaseParallelism); sessions.returnSession((WmTezSession) session); } finally { // Reset the interrupt status. @@ -315,7 +483,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } - /** Closes a running (expired) pool session and reopens it. */ @Override public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception { @@ -334,7 +501,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida @Override public void registerOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { - openSessions.add(session); + openSessions.put(session, null); } updateSessionsTriggers(); } @@ -382,7 +549,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida if (!ensureAmIsRegistered(newSession)) { throw new Exception("Session is not usable after reopen"); } - redistributePoolAllocations(oldSession.getPoolName(), newSession, oldSession); + // Do not release the parallelism - we are just replacing the session in the same pool. + redistributePoolAllocations(oldSession.getPoolName(), newSession, oldSession, false); return newSession; } @@ -392,7 +560,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // We never want to lose pool sessions. Replace it instead; al trigger duck redistribution. WmTezSession wmSession = ensureOwnedSession(session); closeAndReopenPoolSession(wmSession); - redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); + redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, true); + } + + @VisibleForTesting + int getNumSessions() { + return sessions.getInitialSize(); } @Override @@ -427,4 +600,78 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } return counterNames; } + + + // TODO: temporary until real WM schema is created. + public static class TmpHivePool { + private final String name; + private final List<TmpHivePool> children; + private final int queryParallelism; + private final double resourceFraction; + + public TmpHivePool(String name, + List<TmpHivePool> children, int queryParallelism, double resourceFraction) { + this.name = name; + this.children = children; + this.queryParallelism = queryParallelism; + this.resourceFraction = resourceFraction; + } + + public String getName() { + return name; + } + public List<TmpHivePool> getChildren() { + return children; + } + public int getQueryParallelism() { + return queryParallelism; + } + public double getResourceFraction() { + return resourceFraction; + } + } + + public static enum TmpUserMappingType { + USER, DEFAULT + } + + public static class TmpUserMapping { + private final TmpUserMappingType type; + private final String name; + private final String poolName; + private final int priority; + public TmpUserMapping(TmpUserMappingType type, String name, String poolName, int priority) { + this.type = type; + this.name = name; + this.poolName = poolName; + this.priority = priority; + } + public TmpUserMappingType getType() { + return type; + } + public String getName() { + return name; + } + public String getPoolName() { + return poolName; + } + public int getPriority() { + return priority; + } + } + + public static class TmpResourcePlan { + private final List<TmpHivePool> rootPools; + private final List<TmpUserMapping> mappings; + public TmpResourcePlan(List<TmpHivePool> rootPools, List<TmpUserMapping> mappings) { + this.rootPools = rootPools; + this.mappings = mappings; + } + public List<TmpHivePool> getRootPools() { + return rootPools; + } + public List<TmpUserMapping> getMappings() { + return mappings; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 17c62cf..258a865 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -18,26 +18,67 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.*; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; import java.io.IOException; +import java.lang.Thread.State; import java.util.List; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType; import org.apache.tez.dag.api.TezConfiguration; import org.junit.Test; public class TestWorkloadManager { + private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class); + + private final class GetSessionRunnable implements Runnable { + private final AtomicReference<WmTezSession> session; + private final WorkloadManager wm; + private final AtomicReference<Throwable> error; + private final HiveConf conf; + private final CountDownLatch cdl; + private final String userName; + + private GetSessionRunnable(AtomicReference<WmTezSession> session, WorkloadManager wm, + AtomicReference<Throwable> error, HiveConf conf, CountDownLatch cdl, String userName) { + this.session = session; + this.wm = wm; + this.error = error; + this.conf = conf; + this.cdl = cdl; + this.userName = userName; + } + + @Override + public void run() { + WmTezSession old = session.get(); + session.set(null); + if (cdl != null) { + cdl.countDown(); + } + try { + session.set((WmTezSession) wm.getSession(old, userName, conf)); + } catch (Throwable e) { + error.compareAndSet(null, e); + } + } + } + public static class MockQam implements QueryAllocationManager { boolean isCalled = false; @@ -63,8 +104,19 @@ public class TestWorkloadManager { public static class WorkloadManagerForTest extends WorkloadManager { public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, - QueryAllocationManager qam) { - super(yarnQueue, conf, numSessions, qam, null); + QueryAllocationManager qam) { + super(yarnQueue, conf, qam, null, createDummyPlan(numSessions)); + } + + public WorkloadManagerForTest(String yarnQueue, HiveConf conf, + QueryAllocationManager qam, TmpResourcePlan plan) { + super(yarnQueue, conf, qam, null, plan); + } + + private static TmpResourcePlan createDummyPlan(int numSessions) { + return new TmpResourcePlan( + Lists.newArrayList(new TmpHivePool("llap", null, numSessions, 1.0f)), + Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 0))); } @Override @@ -164,11 +216,186 @@ public class TestWorkloadManager { assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); qam.assertWasCalled(); + } - // Now destroy the returned session (which is technically not valid) and confirm correctness. - session.destroy(); - assertEquals(1.0, session2.getClusterFraction(), EPSILON); - //qam.assertWasNotCalled(); + private static TmpUserMapping create(String user, String pool) { + return new TmpUserMapping(TmpUserMappingType.USER, user, pool, 0); + } + + @Test(timeout = 10000) + public void testClusterFractions() throws Exception { + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + List<TmpHivePool> l2 = Lists.newArrayList( + new TmpHivePool("p1", null, 1, 0.5f), new TmpHivePool("p2", null, 2, 0.3f)); + TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( + new TmpHivePool("r1", l2, 1, 0.6f), new TmpHivePool("r2", null, 1, 0.4f)), + Lists.newArrayList(create("p1", "r1/p1"), create("p2", "r1/p2"), create("r1", "r1"), + create("r2", "r2"))); + WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + assertEquals(5, wm.getNumSessions()); + // Get all the 5 sessions; validate cluster fractions. + WmTezSession session05of06 = (WmTezSession) wm.getSession(null, "p1", conf); + assertEquals(0.3, session05of06.getClusterFraction(), EPSILON); + WmTezSession session03of06 = (WmTezSession) wm.getSession(null, "p2", conf); + assertEquals(0.18, session03of06.getClusterFraction(), EPSILON); + WmTezSession session03of06_2 = (WmTezSession) wm.getSession(null, "p2", conf); + assertEquals(0.09, session03of06.getClusterFraction(), EPSILON); + assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON); + WmTezSession session02of06 = (WmTezSession) wm.getSession(null, "r1", conf); + assertEquals(0.12, session02of06.getClusterFraction(), EPSILON); + WmTezSession session04 = (WmTezSession) wm.getSession(null, "r2", conf); + assertEquals(0.4, session04.getClusterFraction(), EPSILON); + session05of06.returnToSessionManager(); + session03of06.returnToSessionManager(); + session03of06_2.returnToSessionManager(); + session02of06.returnToSessionManager(); + session04.returnToSessionManager(); + } + + @Test(timeout=10000) + public void testQueueing() throws Exception { + final HiveConf conf = createConf(); + MockQam qam = new MockQam(); + TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( + new TmpHivePool("A", null, 2, 0.5f), new TmpHivePool("B", null, 2, 0.5f)), + Lists.newArrayList(create("A", "A"), create("B", "B"))); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "A", conf), + sessionA2 = (WmTezSession) wm.getSession(null, "A", conf), + sessionB1 = (WmTezSession) wm.getSession(null, "B", conf); + final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(), + sessionA4 = new AtomicReference<>(); + final AtomicReference<Throwable> error = new AtomicReference<>(); + final CountDownLatch cdl = new CountDownLatch(1); + + Thread t1 = new Thread(new GetSessionRunnable(sessionA3, wm, error, conf, cdl, "A")), + t2 = new Thread(new GetSessionRunnable(sessionA4, wm, error, conf, null, "A")); + waitForThreadToBlockOnQueue(cdl, t1); + t2.start(); + assertNull(sessionA3.get()); + assertNull(sessionA4.get()); + checkError(error); + // While threads are blocked on A, we should still be able to get and return a B session. + WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, "B", conf); + sessionB1.returnToSessionManager(); + sessionB2.returnToSessionManager(); + assertNull(sessionA3.get()); + assertNull(sessionA4.get()); + checkError(error); + // Now release a single session from A. + sessionA1.returnToSessionManager(); + t1.join(); + checkError(error); + assertNotNull(sessionA3.get()); + assertNull(sessionA4.get()); + sessionA3.get().returnToSessionManager(); + t2.join(); + checkError(error); + assertNotNull(sessionA4.get()); + sessionA4.get().returnToSessionManager(); + sessionA2.returnToSessionManager(); + } + + @Test(timeout=10000) + public void testReuseWithQueueing() throws Exception { + final HiveConf conf = createConf(); + MockQam qam = new MockQam(); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); + wm.start(); + WmTezSession session1 = (WmTezSession) wm.getSession(null, null, conf); + // First, try to reuse from the same pool - should "just work". + WmTezSession session1a = (WmTezSession) wm.getSession(session1, null, conf); + assertSame(session1, session1a); + assertEquals(1.0, session1.getClusterFraction(), EPSILON); + // Should still be able to get the 2nd session. + WmTezSession session2 = (WmTezSession) wm.getSession(null, null, conf); + + // Now try to reuse with no other sessions remaining. Should still work. + WmTezSession session2a = (WmTezSession) wm.getSession(session2, null, conf); + assertSame(session2, session2a); + assertEquals(0.5, session1.getClusterFraction(), EPSILON); + assertEquals(0.5, session2.getClusterFraction(), EPSILON); + + // Finally try to reuse with something in the queue. Due to fairness this won't work. + final AtomicReference<WmTezSession> session3 = new AtomicReference<>(), + // We will try to reuse this, but session3 is queued before us. + session4 = new AtomicReference<>(session2); + final AtomicReference<Throwable> error = new AtomicReference<>(); + CountDownLatch cdl = new CountDownLatch(1), cdl2 = new CountDownLatch(1); + Thread t1 = new Thread(new GetSessionRunnable(session3, wm, error, conf, cdl, null), "t1"), + t2 = new Thread(new GetSessionRunnable(session4, wm, error, conf, cdl2, null), "t2"); + waitForThreadToBlockOnQueue(cdl, t1); + assertNull(session3.get()); + checkError(error); + t2.start(); + cdl2.await(); + assertNull(session4.get()); + + // We have released the session by trying to reuse it and going back into queue, s3 can start. + t1.join(); + checkError(error); + assertNotNull(session3.get()); + assertEquals(0.5, session3.get().getClusterFraction(), EPSILON); + + // Now release another session; the thread that gave up on reuse can proceed. + session1.returnToSessionManager(); + t2.join(); + checkError(error); + assertNotNull(session4.get()); + assertNotSame(session2, session4.get()); + assertEquals(0.5, session4.get().getClusterFraction(), EPSILON); + session3.get().returnToSessionManager(); + session4.get().returnToSessionManager(); + } + + private void waitForThreadToBlockOnQueue(CountDownLatch cdl, Thread t1) throws InterruptedException { + t1.start(); + cdl.await(); + // Wait for t1 to block, just be sure. Not ideal... + State s; + do { + s = t1.getState(); + } while (s != State.TIMED_WAITING && s != State.BLOCKED && s != State.WAITING); + } + + + @Test(timeout=10000) + public void testReuseWithDifferentPool() throws Exception { + final HiveConf conf = createConf(); + MockQam qam = new MockQam(); + TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( + new TmpHivePool("A", null, 2, 0.6f), new TmpHivePool("B", null, 1, 0.4f)), + Lists.newArrayList(create("A", "A"), create("B", "B"))); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "A", conf), + sessionA2 = (WmTezSession) wm.getSession(null, "A", conf); + assertEquals("A", sessionA1.getPoolName()); + assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("A", sessionA2.getPoolName()); + assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON); + WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, "B", conf); + assertSame(sessionA1, sessionB1); + assertEquals("B", sessionB1.getPoolName()); + assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); + assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A. + // Make sure that we can still get a session from A. + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, "A", conf); + assertEquals("A", sessionA3.getPoolName()); + assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); + assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); + sessionA3.returnToSessionManager(); + sessionB1.returnToSessionManager(); + sessionA2.returnToSessionManager(); + } + + private void checkError(final AtomicReference<Throwable> error) throws Exception { + Throwable t = error.get(); + if (t == null) return; + throw new Exception(t); } private HiveConf createConf() { http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index b121a06..2c4fe7f 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,6 +18,13 @@ package org.apache.hive.service.server; +import com.google.common.collect.Lists; + +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType; + import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; @@ -179,7 +186,9 @@ public class HiveServer2 extends CompositeService { // Initialize workload management. String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); if (wmQueue != null && !wmQueue.isEmpty()) { - wm = WorkloadManager.create(wmQueue, hiveConf); + wm = WorkloadManager.create(wmQueue, hiveConf, new TmpResourcePlan( + Lists.newArrayList(new TmpHivePool("llap", null, 1, 1.0f)), + Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 0)))); } else { wm = null; }