Repository: hive
Updated Branches:
  refs/heads/master 52e1ba15d -> 0538e5102


HIVE-17473 : implement workload management pools (Sergey Shelukhin, reviewed by 
Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0538e510
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0538e510
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0538e510

Branch: refs/heads/master
Commit: 0538e510232c7c685621c2608f5aa729f2afc2eb
Parents: 52e1ba1
Author: sergey <ser...@apache.org>
Authored: Thu Oct 19 17:45:12 2017 -0700
Committer: sergey <ser...@apache.org>
Committed: Thu Oct 19 17:45:12 2017 -0700

----------------------------------------------------------------------
 .../hive/jdbc/TestTriggersWorkloadManager.java  |   2 +-
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java |   8 +-
 .../hive/ql/exec/tez/UserPoolMapping.java       |  92 ++++++
 .../hive/ql/exec/tez/WorkloadManager.java       | 329 ++++++++++++++++---
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 259 ++++++++++++++-
 .../apache/hive/service/server/HiveServer2.java |  11 +-
 6 files changed, 641 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index 8485ba6..8d7693d 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -73,7 +73,7 @@ public class TestTriggersWorkloadManager extends 
TestTriggersTezSessionPoolManag
   @Override
   protected void setupTriggers(final List<Trigger> triggers) throws Exception {
     WorkloadManager wm = WorkloadManager.getInstance();
-    WorkloadManager.PoolState poolState = spy(new WorkloadManager.PoolState());
+    WorkloadManager.PoolState poolState = spy(new 
WorkloadManager.PoolState("llap", 1, 1f));
     when(poolState.getTriggers()).thenReturn(triggers);
     wm.getPools().put("llap", poolState);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 4f2997b..b67c933 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -50,6 +50,7 @@ class TezSessionPool<SessionType extends 
TezSessionPoolSession> {
       new ConcurrentLinkedQueue<SessionType>();
 
   private final HiveConf initConf;
+  private int initialSize;
 
   // TODO: eventually, this will need to support resize. That would probably 
require replacement
   //       with a RW lock, a semaphore and linked list.
@@ -71,6 +72,7 @@ class TezSessionPool<SessionType extends 
TezSessionPoolSession> {
   }
 
   void startInitialSessions() throws Exception {
+    initialSize = initialSessions.size();
     if (initialSessions.isEmpty()) return;
     if (amRegistry != null) {
       amRegistry.start();
@@ -240,4 +242,8 @@ class TezSessionPool<SessionType extends 
TezSessionPoolSession> {
       bySessionId.remove(sessionId);
     }
   }
-}
\ No newline at end of file
+
+  int getInitialSize() {
+    return initialSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
new file mode 100644
index 0000000..81d6b85
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
+
+class UserPoolMapping {
+  private final static class Mapping {
+    public Mapping(String poolName, int priority) {
+      this.fullPoolName = poolName;
+      this.priority = priority;
+    }
+    int priority;
+    String fullPoolName;
+    @Override
+    public String toString() {
+      return "[" + fullPoolName + ", priority=" + priority + "]";
+    }
+  }
+
+  private final Map<String, Mapping> userMappings = new HashMap<>();
+  private final String defaultPoolName;
+  // TODO: add other types as needed
+
+  public UserPoolMapping(List<TmpUserMapping> mappings, Set<String> poolNames) 
{
+    String defaultPoolName = null;
+    for (TmpUserMapping mapping : mappings) {
+      switch (mapping.getType()) {
+      case USER: {
+          String poolName = getValidPoolName(poolNames, mapping);
+          Mapping val = new Mapping(poolName, mapping.getPriority());
+          Mapping oldValue = userMappings.put(mapping.getName(), val);
+          if (oldValue != null) {
+            throw new AssertionError("Duplicate mapping for user " + 
mapping.getName() + "; "
+                + oldValue + " and " + val);
+          }
+        break;
+      }
+      case DEFAULT: {
+        String poolName = getValidPoolName(poolNames, mapping);
+        if (defaultPoolName != null) {
+          throw new AssertionError("Duplicate default mapping; "
+              + defaultPoolName + " and " + poolName);
+        }
+        defaultPoolName = poolName;
+        break;
+      }
+      default: throw new AssertionError("Unknown type " + mapping.getType());
+      }
+    }
+    this.defaultPoolName = defaultPoolName;
+  }
+
+  public String mapSessionToPoolName(String userName) {
+    // For now, we only have user rules, so this is very simple.
+    // In future we'd also look up groups (each groups the user is in 
initially; we may do it
+    // the opposite way if the user is a member of many groups but there are 
not many rules),
+    // whatever user supplies in connection string to HS2, etc.
+    // If multiple rules match, we'd need to get the highest priority one.
+    Mapping userMapping = userMappings.get(userName);
+    if (userMapping != null) return userMapping.fullPoolName;
+    return defaultPoolName;
+  }
+
+  private static String getValidPoolName(Set<String> poolNames, TmpUserMapping 
mapping) {
+    String poolName = mapping.getPoolName();
+    // Should we really be validating here? The plan should be validated 
before applying.
+    if (!poolNames.contains(mapping.getPoolName())) {
+      throw new AssertionError("Invalid pool in the mapping " + poolName);
+    }
+    return poolName;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 54c7f74..d725e90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -44,8 +48,6 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 
 /** Workload management entry point for HS2. */
 public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValidator
@@ -60,11 +62,21 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   private final RestrictedConfigChecker restrictedConfig;
   private final QueryAllocationManager allocationManager;
   private final String yarnQueue;
+  // Note: it's not clear that we need to track this - unlike PoolManager we 
don't have non-pool
+  //       sessions, so the pool itself could internally track the sessions it 
gave out, since
+  //       calling close on an unopened session is probably harmless.
+  private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions =
+      new IdentityHashMap<>();
+  private final int amRegistryTimeoutMs;
+
+
   /** Sessions given out (i.e. between get... and return... calls), separated 
by Hive pool. */
   private final ReentrantReadWriteLock poolsLock = new 
ReentrantReadWriteLock();
-  private final List<TezSessionPoolSession> openSessions = new LinkedList<>();
   private final Map<String, PoolState> pools = new HashMap<>();
-  private final int amRegistryTimeoutMs;
+  // Used to make sure that waiting getSessions don't block update.
+  private int internalPoolsVersion;
+  private UserPoolMapping userPoolMapping;
+
   private SessionTriggerProvider sessionTriggerProvider;
   private TriggerActionHandler triggerActionHandler;
   private TriggerValidatorRunnable triggerValidatorRunnable;
@@ -73,8 +85,31 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     // Add stuff here as WM is implemented.
     private final Object lock = new Object();
     private final List<WmTezSession> sessions = new ArrayList<>();
+    private final Semaphore sessionsClaimed;
+
+    private final String fullName;
+    private final double finalFraction;
+    private double finalFractionRemaining;
+    private final int queryParallelism;
     private final List<Trigger> triggers = new ArrayList<>();
 
+    public PoolState(String fullName, int queryParallelism, double fraction) {
+      this.fullName = fullName;
+      this.queryParallelism = queryParallelism;
+      // A fair semaphore to ensure correct queue order.
+      this.sessionsClaimed = new Semaphore(queryParallelism, true);
+      this.finalFraction = this.finalFractionRemaining = fraction;
+    }
+
+    @Override
+    public String toString() {
+      return "[" + fullName + ", query parallelism " + queryParallelism
+          + ", fraction of the cluster " + finalFraction + ", fraction used by 
child pools "
+          + (finalFraction - finalFractionRemaining) + ", active sessions " + 
sessions.size()
+          + "]";
+    }
+
+
     public List<Trigger> getTriggers() {
       return triggers;
     }
@@ -93,14 +128,13 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   }
 
   /** Called once, when HS2 initializes. */
-  public static WorkloadManager create(String yarnQueue, HiveConf conf) {
+  public static WorkloadManager create(String yarnQueue, HiveConf conf, 
TmpResourcePlan plan) {
     assert INSTANCE == null;
     Token<JobTokenIdentifier> amsToken = createAmsToken();
     // We could derive the expected number of AMs to pass in.
     LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, 
amsToken, -1);
     QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm);
-    // TODO: Hardcode one session for now; initial policies should be passed 
in.
-    return (INSTANCE = new WorkloadManager(yarnQueue, conf, 1, qam, amsToken));
+    return (INSTANCE = new WorkloadManager(yarnQueue, conf, qam, amsToken, 
plan));
   }
 
   private static Token<JobTokenIdentifier> createAmsToken() {
@@ -116,11 +150,13 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   }
 
   @VisibleForTesting
-  WorkloadManager(String yarnQueue, HiveConf conf, int numSessions,
-      QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken) {
+  WorkloadManager(String yarnQueue, HiveConf conf,
+      QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken, 
TmpResourcePlan plan) {
     this.yarnQueue = yarnQueue;
     this.conf = conf;
-    initializeHivePools();
+    int numSessions = initializeHivePools(plan);
+    LOG.info("Initializing with " + numSessions + " total query parallelism");
+
     this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar(
         conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, 
TimeUnit.MILLISECONDS);
     sessions = new TezSessionPool<>(conf, numSessions, true);
@@ -138,32 +174,149 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     startTriggerValidator(conf);
   }
 
-  private void initializeHivePools() {
-    // TODO: real implementation
+  private int initializeHivePools(TmpResourcePlan plan) {
     poolsLock.writeLock().lock();
     try {
       // FIXME: Add Triggers from metastore to poolstate
-      pools.put("llap", new PoolState());
+      // Note: we assume here that plan has been validated beforehand, so we 
don't verify
+      //       that fractions or query parallelism add up.
+      int totalQueryParallelism = 0;
+      // Use recursion to update parents more conveniently; we don't expect a 
big tree.
+      for (TmpHivePool pool : plan.getRootPools()) {
+        totalQueryParallelism += addHivePool(pool, null);
+      }
+      this.userPoolMapping = new UserPoolMapping(plan.getMappings(), 
pools.keySet());
+      internalPoolsVersion = 0; // Initializing for the first time.
+      return totalQueryParallelism;
     } finally {
       poolsLock.writeLock().unlock();
     }
   }
 
+  private final static char POOL_SEPARATOR = '/';
+  private int addHivePool(TmpHivePool pool, PoolState parent) {
+    String fullName = pool.getName();
+    int totalQueryParallelism = pool.getQueryParallelism();
+    double fraction = pool.getResourceFraction();
+    if (parent != null) {
+      fullName = parent.fullName + POOL_SEPARATOR + fullName;
+      fraction = parent.finalFraction * pool.getResourceFraction();
+      parent.finalFractionRemaining -= fraction;
+    }
+    PoolState state = new PoolState(fullName, totalQueryParallelism, fraction);
+    if (pool.getChildren() != null) {
+      for (TmpHivePool child : pool.getChildren()) {
+        totalQueryParallelism += addHivePool(child, state);
+      }
+    }
+    LOG.info("Adding Hive pool: " + state);
+    pools.put(fullName, state);
+    return totalQueryParallelism;
+  }
+
   public TezSessionState getSession(
       TezSessionState session, String userName, HiveConf conf) throws 
Exception {
     validateConfig(conf);
-    String poolName = mapSessionToPoolName(userName);
-    // TODO: do query parallelism enforcement here based on the policies and 
pools.
-    while (true) {
-      WmTezSession result = checkSessionForReuse(session);
-      // TODO: when proper AM management is implemented, we should call 
tryGet... here, because the
-      //       parallelism will be enforced here, and pool would always have a 
session for us.
-      result = (result == null ? sessions.getSession() : result);
-      result.setQueueName(yarnQueue);
-      result.setPoolName(poolName);
-      if (!ensureAmIsRegistered(result)) continue; // Try another.
-      redistributePoolAllocations(poolName, result, null);
-      return result;
+    WmTezSession result = checkSessionForReuse(session);
+    boolean hasAcquired = false;
+    String poolName = null;
+    while (!hasAcquired) { // This loop handles concurrent plan updates while 
we are waiting.
+      poolName = userPoolMapping.mapSessionToPoolName(userName);
+      if (poolName == null) {
+        throw new HiveException("Cannot find any pool mapping for user " + 
userName);
+      }
+      int internalVersion = -1;
+      Semaphore sessionsClaimed = null;
+      poolsLock.readLock().lock();
+      try {
+        PoolState pool = pools.get(poolName);
+        if (pool == null) throw new AssertionError("Pool " + poolName + " not 
found.");
+        // No need to take the pool lock, semaphore is final.
+        sessionsClaimed = pool.sessionsClaimed;
+        internalVersion = internalPoolsVersion;
+      } finally {
+        poolsLock.readLock().unlock();
+      }
+      // One cannot simply reuse the session if there are other queries 
waiting; to maintain
+      // fairness, we'll try to take the semaphore instantly, and if that 
fails we'll return
+      // this session back to the pool and potentially give the user a new 
session later.
+      if (result != null) {
+        // Handle the special case; the pool may be exactly at capacity w/o 
queue. In that
+        // case, we still should be able to reuse.
+        boolean isFromTheSamePool = false;
+        String oldPoolName = result.getPoolName();
+        if (poolName.equals(oldPoolName)) {
+          sessionsClaimed.release();
+          isFromTheSamePool = true;
+        }
+        // Note: we call timed acquire because untimed one ignores fairness.
+        hasAcquired = sessionsClaimed.tryAcquire(1, TimeUnit.MILLISECONDS);
+        if (hasAcquired) {
+          poolsLock.readLock().lock();
+          boolean doUnlock = true;
+          try {
+            if (internalVersion == internalPoolsVersion) {
+              if (!isFromTheSamePool) {
+                // Free up the usage in the old pool. TODO: ideally not under 
lock; not critical.
+                redistributePoolAllocations(oldPoolName, null, result, true);
+              }
+              doUnlock = false; // Do not unlock; see below.
+              break;
+            }
+          } finally {
+            if (doUnlock) {
+              poolsLock.readLock().unlock();
+            }
+          }
+          hasAcquired = false;
+        }
+        // Note: we are short-circuiting session::returnToSessionManager to 
supply the flag
+        returnAfterUse(result, !isFromTheSamePool);
+        result = null;
+      }
+      // We don't expect frequent updates, so check every second.
+      while (!(hasAcquired = (hasAcquired || sessionsClaimed.tryAcquire(1, 
TimeUnit.SECONDS)))) {
+        poolsLock.readLock().lock();
+        try {
+          if (internalVersion != internalPoolsVersion) break;
+        } finally {
+          poolsLock.readLock().unlock();
+        }
+      }
+      if (!hasAcquired) continue;
+      // Keep it simple for now - everything between acquiring the semaphore 
and adding the session
+      // to the pool state is done under read lock, blocking pool updates. 
It's possible to make
+      // it more granular if needed. The only potentially lengthy operation is 
waiting for an
+      // expired session to be restarted in the session pool.
+      poolsLock.readLock().lock();
+      if (internalVersion == internalPoolsVersion) break;
+      poolsLock.readLock().unlock();
+      hasAcquired = false;
+    }
+    // We are holding the lock from the end of the loop.
+    try {
+      assert hasAcquired;
+      while (true) {
+        // TODO: ideally, we'd need to implement tryGet and deal with the 
valid wait from a session
+        //       restarting somehow, as opposed to the invalid case of a 
session missing from the
+        //       pool due to some bug. Keep a "restarting" counter in the pool?
+        boolean isFromTheSamePool = false;
+        if (result == null) {
+          result = sessions.getSession();
+        } else {
+          // If we are just reusing the session from the same pool, do not 
adjust allocations.
+          isFromTheSamePool = poolName.equals(result.getPoolName());
+        }
+        result.setQueueName(yarnQueue);
+        result.setPoolName(poolName);
+        if (!ensureAmIsRegistered(result)) continue; // Try another.
+        if (!isFromTheSamePool) {
+          redistributePoolAllocations(poolName, result, null, false);
+        }
+        return result;
+      }
+    } finally {
+      poolsLock.readLock().unlock();
     }
   }
 
@@ -181,12 +334,14 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   }
 
   private void redistributePoolAllocations(
-      String poolName, WmTezSession sessionToAdd, WmTezSession 
sessionToRemove) {
+      String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove,
+      boolean releaseParallelism) {
     List<WmTezSession> sessionsToUpdate = null;
     double totalAlloc = 0;
     assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName());
     assert sessionToRemove == null || 
poolName.equals(sessionToRemove.getPoolName());
     poolsLock.readLock().lock();
+    boolean hasRemoveFailed = false;
     try {
       PoolState pool = pools.get(poolName);
       synchronized (pool.lock) {
@@ -195,12 +350,21 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
           pool.sessions.add(sessionToAdd);
         }
         if (sessionToRemove != null) {
+          // TODO: this assumes that the update process will take the write 
lock, and make
+          //       everything right w.r.t. semaphores, pool names and other 
stuff, since we might
+          //       be releasing a different semaphore from the one we acquired 
if it's across
+          //       the update. If the magic in the update is weak, this may 
become more involved.
           if (!pool.sessions.remove(sessionToRemove)) {
             LOG.error("Session " + sessionToRemove + " could not be removed 
from the pool");
+            if (releaseParallelism) {
+              hasRemoveFailed = true;
+            }
+          } else if (releaseParallelism) {
+            pool.sessionsClaimed.release();
           }
           sessionToRemove.setClusterFraction(0);
         }
-        totalAlloc = updatePoolAllocations(pool.sessions);
+        totalAlloc = updatePoolAllocations(pool.sessions, 
pool.finalFractionRemaining);
         sessionsToUpdate = new ArrayList<>(pool.sessions);
       }
     } finally {
@@ -208,6 +372,10 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     }
     allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate);
     updateSessionsTriggers();
+    if (hasRemoveFailed) {
+      throw new AssertionError("Cannot remove the session from the pool and 
release "
+          + "the query slot; HS2 may fail to accept queries");
+    }
   }
 
   private WmTezSession checkSessionForReuse(TezSessionState session) throws 
Exception {
@@ -232,18 +400,13 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     return null;
   }
 
-  private double updatePoolAllocations(List<WmTezSession> sessions) {
+  private double updatePoolAllocations(List<WmTezSession> sessions, double 
totalFraction) {
     // TODO: real implementation involving in-the-pool policy interface, etc.
-    double allocation = 1.0 / sessions.size();
+    double allocation = totalFraction / sessions.size();
     for (WmTezSession session : sessions) {
       session.setClusterFraction(allocation);
     }
-    return 1.0;
-  }
-
-  private String mapSessionToPoolName(String userName) {
-    // TODO: real implementation, probably calling into another class 
initialized with policies.
-    return "llap";
+    return totalFraction;
   }
 
   private void validateConfig(HiveConf conf) throws HiveException {
@@ -272,7 +435,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   public void stop() throws Exception {
     List<TezSessionPoolSession> sessionsToClose = null;
     synchronized (openSessions) {
-      sessionsToClose = new ArrayList<TezSessionPoolSession>(openSessions);
+      sessionsToClose = new ArrayList<>(openSessions.keySet());
     }
 
     for (TezSessionPoolSession sessionState : sessionsToClose) {
@@ -302,10 +465,15 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
 
   @Override
   public void returnAfterUse(TezSessionPoolSession session) throws Exception {
+    returnAfterUse(session, true);
+  }
+
+  private void returnAfterUse(
+      TezSessionPoolSession session, boolean releaseParallelism) throws 
Exception {
     boolean isInterrupted = Thread.interrupted();
     try {
       WmTezSession wmSession = ensureOwnedSession(session);
-      redistributePoolAllocations(wmSession.getPoolName(), null, wmSession);
+      redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, 
releaseParallelism);
       sessions.returnSession((WmTezSession) session);
     } finally {
       // Reset the interrupt status.
@@ -315,7 +483,6 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-
   /** Closes a running (expired) pool session and reopens it. */
   @Override
   public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) 
throws Exception {
@@ -334,7 +501,7 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
   @Override
   public void registerOpenSession(TezSessionPoolSession session) {
     synchronized (openSessions) {
-      openSessions.add(session);
+      openSessions.put(session, null);
     }
     updateSessionsTriggers();
   }
@@ -382,7 +549,8 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     if (!ensureAmIsRegistered(newSession)) {
       throw new Exception("Session is not usable after reopen");
     }
-    redistributePoolAllocations(oldSession.getPoolName(), newSession, 
oldSession);
+    // Do not release the parallelism - we are just replacing the session in 
the same pool.
+    redistributePoolAllocations(oldSession.getPoolName(), newSession, 
oldSession, false);
     return newSession;
   }
 
@@ -392,7 +560,12 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     // We never want to lose pool sessions. Replace it instead; al trigger 
duck redistribution.
     WmTezSession wmSession = ensureOwnedSession(session);
     closeAndReopenPoolSession(wmSession);
-    redistributePoolAllocations(wmSession.getPoolName(), null, wmSession);
+    redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, 
true);
+  }
+
+  @VisibleForTesting
+  int getNumSessions() {
+    return sessions.getInitialSize();
   }
 
   @Override
@@ -427,4 +600,78 @@ public class WorkloadManager extends 
TezSessionPoolSession.AbstractTriggerValida
     }
     return counterNames;
   }
+
+
+  // TODO: temporary until real WM schema is created.
+  public static class TmpHivePool {
+    private final String name;
+    private final List<TmpHivePool> children;
+    private final int queryParallelism;
+    private final double resourceFraction;
+
+    public TmpHivePool(String name,
+        List<TmpHivePool> children, int queryParallelism, double 
resourceFraction) {
+      this.name = name;
+      this.children = children;
+      this.queryParallelism = queryParallelism;
+      this.resourceFraction = resourceFraction;
+    }
+
+    public String getName() {
+      return name;
+    }
+    public List<TmpHivePool> getChildren() {
+      return children;
+    }
+    public int getQueryParallelism() {
+      return queryParallelism;
+    }
+    public double getResourceFraction() {
+      return resourceFraction;
+    }
+  }
+
+  public static enum TmpUserMappingType {
+    USER, DEFAULT
+  }
+
+  public static class TmpUserMapping {
+    private final TmpUserMappingType type;
+    private final String name;
+    private final String poolName;
+    private final int priority;
+    public TmpUserMapping(TmpUserMappingType type, String name, String 
poolName, int priority) {
+      this.type = type;
+      this.name = name;
+      this.poolName = poolName;
+      this.priority = priority;
+    }
+    public TmpUserMappingType getType() {
+      return type;
+    }
+    public String getName() {
+      return name;
+    }
+    public String getPoolName() {
+      return poolName;
+    }
+    public int getPriority() {
+      return priority;
+    }
+  }
+
+  public static class TmpResourcePlan {
+    private final List<TmpHivePool> rootPools;
+    private final List<TmpUserMapping> mappings;
+    public TmpResourcePlan(List<TmpHivePool> rootPools, List<TmpUserMapping> 
mappings) {
+      this.rootPools = rootPools;
+      this.mappings = mappings;
+    }
+    public List<TmpHivePool> getRootPools() {
+      return rootPools;
+    }
+    public List<TmpUserMapping> getMappings() {
+      return mappings;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 17c62cf..258a865 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -18,26 +18,67 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.*;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.lang.Thread.State;
 import java.util.List;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.Test;
 
 public class TestWorkloadManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestWorkloadManager.class);
+
+  private final class GetSessionRunnable implements Runnable {
+    private final AtomicReference<WmTezSession> session;
+    private final WorkloadManager wm;
+    private final AtomicReference<Throwable> error;
+    private final HiveConf conf;
+    private final CountDownLatch cdl;
+    private final String userName;
+
+    private GetSessionRunnable(AtomicReference<WmTezSession> session, 
WorkloadManager wm,
+        AtomicReference<Throwable> error, HiveConf conf, CountDownLatch cdl, 
String userName) {
+      this.session = session;
+      this.wm = wm;
+      this.error = error;
+      this.conf = conf;
+      this.cdl = cdl;
+      this.userName = userName;
+    }
+
+    @Override
+    public void run() {
+      WmTezSession old = session.get();
+      session.set(null);
+      if (cdl != null) {
+        cdl.countDown();
+      }
+      try {
+       session.set((WmTezSession) wm.getSession(old, userName, conf));
+      } catch (Throwable e) {
+        error.compareAndSet(null, e);
+      }
+    }
+  }
+
   public static class MockQam implements QueryAllocationManager {
     boolean isCalled = false;
 
@@ -63,8 +104,19 @@ public class TestWorkloadManager {
   public static class WorkloadManagerForTest extends WorkloadManager {
 
     public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int 
numSessions,
-      QueryAllocationManager qam) {
-      super(yarnQueue, conf, numSessions, qam, null);
+        QueryAllocationManager qam) {
+      super(yarnQueue, conf, qam, null, createDummyPlan(numSessions));
+    }
+
+    public WorkloadManagerForTest(String yarnQueue, HiveConf conf,
+        QueryAllocationManager qam, TmpResourcePlan plan) {
+      super(yarnQueue, conf, qam, null, plan);
+    }
+
+    private static TmpResourcePlan createDummyPlan(int numSessions) {
+      return new TmpResourcePlan(
+          Lists.newArrayList(new TmpHivePool("llap", null, numSessions, 1.0f)),
+          Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, 
"", "llap", 0)));
     }
 
     @Override
@@ -164,11 +216,186 @@ public class TestWorkloadManager {
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalled();
+  }
 
-    // Now destroy the returned session (which is technically not valid) and 
confirm correctness.
-    session.destroy();
-    assertEquals(1.0, session2.getClusterFraction(), EPSILON);
-    //qam.assertWasNotCalled();
+  private static TmpUserMapping create(String user, String pool) {
+    return new TmpUserMapping(TmpUserMappingType.USER, user, pool, 0);
+  }
+
+  @Test(timeout = 10000)
+  public void testClusterFractions() throws Exception {
+    HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    List<TmpHivePool> l2 = Lists.newArrayList(
+        new TmpHivePool("p1", null, 1, 0.5f), new TmpHivePool("p2", null, 2, 
0.3f));
+    TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("r1", l2, 1, 0.6f), new TmpHivePool("r2", null, 1, 
0.4f)),
+        Lists.newArrayList(create("p1", "r1/p1"), create("p2", "r1/p2"), 
create("r1", "r1"),
+            create("r2", "r2")));
+    WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+    assertEquals(5, wm.getNumSessions());
+    // Get all the 5 sessions; validate cluster fractions.
+    WmTezSession session05of06 = (WmTezSession) wm.getSession(null, "p1", 
conf);
+    assertEquals(0.3, session05of06.getClusterFraction(), EPSILON);
+    WmTezSession session03of06 = (WmTezSession) wm.getSession(null, "p2", 
conf);
+    assertEquals(0.18, session03of06.getClusterFraction(), EPSILON);
+    WmTezSession session03of06_2 = (WmTezSession) wm.getSession(null, "p2", 
conf);
+    assertEquals(0.09, session03of06.getClusterFraction(), EPSILON);
+    assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON);
+    WmTezSession session02of06 = (WmTezSession) wm.getSession(null, "r1", 
conf);
+    assertEquals(0.12, session02of06.getClusterFraction(), EPSILON);
+    WmTezSession session04 = (WmTezSession) wm.getSession(null, "r2", conf);
+    assertEquals(0.4, session04.getClusterFraction(), EPSILON);
+    session05of06.returnToSessionManager();
+    session03of06.returnToSessionManager();
+    session03of06_2.returnToSessionManager();
+    session02of06.returnToSessionManager();
+    session04.returnToSessionManager();
+  }
+
+  @Test(timeout=10000)
+  public void testQueueing() throws Exception {
+    final HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 2, 0.5f), new TmpHivePool("B", null, 2, 
0.5f)),
+        Lists.newArrayList(create("A", "A"), create("B", "B")));
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
+    wm.start();
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "A", conf),
+        sessionA2 = (WmTezSession) wm.getSession(null, "A", conf),
+        sessionB1 = (WmTezSession) wm.getSession(null, "B", conf);
+    final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(),
+        sessionA4 = new AtomicReference<>();
+    final AtomicReference<Throwable> error = new AtomicReference<>();
+    final CountDownLatch cdl = new CountDownLatch(1);
+
+    Thread t1 = new Thread(new GetSessionRunnable(sessionA3, wm, error, conf, 
cdl, "A")),
+        t2 = new Thread(new GetSessionRunnable(sessionA4, wm, error, conf, 
null, "A"));
+    waitForThreadToBlockOnQueue(cdl, t1);
+    t2.start();
+    assertNull(sessionA3.get());
+    assertNull(sessionA4.get());
+    checkError(error);
+    // While threads are blocked on A, we should still be able to get and 
return a B session.
+    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, "B", conf);
+    sessionB1.returnToSessionManager();
+    sessionB2.returnToSessionManager();
+    assertNull(sessionA3.get());
+    assertNull(sessionA4.get());
+    checkError(error);
+    // Now release a single session from A.
+    sessionA1.returnToSessionManager();
+    t1.join();
+    checkError(error);
+    assertNotNull(sessionA3.get());
+    assertNull(sessionA4.get());
+    sessionA3.get().returnToSessionManager();
+    t2.join();
+    checkError(error);
+    assertNotNull(sessionA4.get());
+    sessionA4.get().returnToSessionManager();
+    sessionA2.returnToSessionManager();
+  }
+
+  @Test(timeout=10000)
+  public void testReuseWithQueueing() throws Exception {
+    final HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, 
qam);
+    wm.start();
+    WmTezSession session1 = (WmTezSession) wm.getSession(null, null, conf);
+    // First, try to reuse from the same pool - should "just work".
+    WmTezSession session1a = (WmTezSession) wm.getSession(session1, null, 
conf);
+    assertSame(session1, session1a);
+    assertEquals(1.0, session1.getClusterFraction(), EPSILON);
+    // Should still be able to get the 2nd session.
+    WmTezSession session2 = (WmTezSession) wm.getSession(null, null, conf);
+
+    // Now try to reuse with no other sessions remaining. Should still work.
+    WmTezSession session2a = (WmTezSession) wm.getSession(session2, null, 
conf);
+    assertSame(session2, session2a);
+    assertEquals(0.5, session1.getClusterFraction(), EPSILON);
+    assertEquals(0.5, session2.getClusterFraction(), EPSILON);
+
+    // Finally try to reuse with something in the queue. Due to fairness this 
won't work.
+    final AtomicReference<WmTezSession> session3 = new AtomicReference<>(),
+    // We will try to reuse this, but session3 is queued before us.
+        session4 = new AtomicReference<>(session2);
+    final AtomicReference<Throwable> error = new AtomicReference<>();
+    CountDownLatch cdl = new CountDownLatch(1), cdl2 = new CountDownLatch(1);
+    Thread t1 = new Thread(new GetSessionRunnable(session3, wm, error, conf, 
cdl, null), "t1"),
+        t2 = new Thread(new GetSessionRunnable(session4, wm, error, conf, 
cdl2, null), "t2");
+    waitForThreadToBlockOnQueue(cdl, t1);
+    assertNull(session3.get());
+    checkError(error);
+    t2.start();
+    cdl2.await();
+    assertNull(session4.get());
+
+    // We have released the session by trying to reuse it and going back into 
queue, s3 can start.
+    t1.join();
+    checkError(error);
+    assertNotNull(session3.get());
+    assertEquals(0.5, session3.get().getClusterFraction(), EPSILON);
+
+    // Now release another session; the thread that gave up on reuse can 
proceed.
+    session1.returnToSessionManager();
+    t2.join();
+    checkError(error);
+    assertNotNull(session4.get());
+    assertNotSame(session2, session4.get());
+    assertEquals(0.5, session4.get().getClusterFraction(), EPSILON);
+    session3.get().returnToSessionManager();
+    session4.get().returnToSessionManager();
+  }
+
+  private void waitForThreadToBlockOnQueue(CountDownLatch cdl, Thread t1) 
throws InterruptedException {
+    t1.start();
+    cdl.await();
+    // Wait for t1 to block, just be sure. Not ideal...
+    State s;
+    do {
+      s = t1.getState();
+    } while (s != State.TIMED_WAITING && s != State.BLOCKED && s != 
State.WAITING);
+  }
+
+
+  @Test(timeout=10000)
+  public void testReuseWithDifferentPool() throws Exception {
+    final HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 2, 0.6f), new TmpHivePool("B", null, 1, 
0.4f)),
+        Lists.newArrayList(create("A", "A"), create("B", "B")));
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, 
plan);
+    wm.start();
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "A", conf),
+        sessionA2 = (WmTezSession) wm.getSession(null, "A", conf);
+    assertEquals("A", sessionA1.getPoolName());
+    assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON);
+    assertEquals("A", sessionA2.getPoolName());
+    assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, "B", 
conf);
+    assertSame(sessionA1, sessionB1);
+    assertEquals("B", sessionB1.getPoolName());
+    assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
+    assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed 
from A.
+    // Make sure that we can still get a session from A.
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, "A", conf);
+    assertEquals("A", sessionA3.getPoolName());
+    assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
+    assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
+    sessionA3.returnToSessionManager();
+    sessionB1.returnToSessionManager();
+    sessionA2.returnToSessionManager();
+  }
+
+  private void checkError(final AtomicReference<Throwable> error) throws 
Exception {
+    Throwable t = error.get();
+    if (t == null) return;
+    throw new Exception(t);
   }
 
   private HiveConf createConf() {

http://git-wip-us.apache.org/repos/asf/hive/blob/0538e510/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index b121a06..2c4fe7f 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -18,6 +18,13 @@
 
 package org.apache.hive.service.server;
 
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
+
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -179,7 +186,9 @@ public class HiveServer2 extends CompositeService {
     // Initialize workload management.
     String wmQueue = HiveConf.getVar(hiveConf, 
ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
     if (wmQueue != null && !wmQueue.isEmpty()) {
-      wm = WorkloadManager.create(wmQueue, hiveConf);
+      wm = WorkloadManager.create(wmQueue, hiveConf, new TmpResourcePlan(
+          Lists.newArrayList(new TmpHivePool("llap", null, 1, 1.0f)),
+          Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, 
"", "llap", 0))));
     } else {
       wm = null;
     }

Reply via email to