Repository: hive Updated Branches: refs/heads/master 73ea9f595 -> 4ccea29bd
HIVE-17905 : propagate background LLAP cluster changes to WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ccea29b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ccea29b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ccea29b Branch: refs/heads/master Commit: 4ccea29bdc9f1f0af3611e3abba1e8af5b898ef8 Parents: 73ea9f5 Author: sergey <ser...@apache.org> Authored: Wed Nov 29 16:09:50 2017 -0800 Committer: sergey <ser...@apache.org> Committed: Wed Nov 29 16:09:50 2017 -0800 ---------------------------------------------------------------------- .../ql/exec/tez/GuaranteedTasksAllocator.java | 25 ++++++---- .../ql/exec/tez/QueryAllocationManager.java | 9 ++++ .../hive/ql/exec/tez/WorkloadManager.java | 7 +-- .../hive/ql/exec/tez/TestWorkloadManager.java | 52 +++++++++++++++++--- 4 files changed, 73 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index d6d6f07..5f6ab05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -17,18 +17,15 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo; - -import org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClient.UpdateRequestContext; - import com.google.common.annotations.VisibleForTesting; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.ExecuteRequestCallback; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryRequestProto; import org.apache.hadoop.hive.llap.plugin.rpc.LlapPluginProtocolProtos.UpdateQueryResponseProto; +import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo; +import org.apache.hadoop.hive.ql.exec.tez.LlapPluginEndpointClient.UpdateRequestContext; import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +40,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager { private final LlapClusterStateForCompile clusterState; private final Thread clusterStateUpdateThread; private final LlapPluginEndpointClient amCommunicator; + private Runnable clusterChangedCallback; public GuaranteedTasksAllocator( Configuration conf, LlapPluginEndpointClient amCommunicator) { @@ -50,10 +48,16 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager { this.clusterState = new LlapClusterStateForCompile(conf, CLUSTER_INFO_UPDATE_INTERVAL_MS); this.amCommunicator = amCommunicator; this.clusterStateUpdateThread = new Thread(new Runnable() { + private int lastExecutorCount = -1; @Override public void run() { while (true) { - getExecutorCount(true); // Trigger an update if needed. + int executorCount = getExecutorCount(true); // Trigger an update if needed. + + if (executorCount != lastExecutorCount && lastExecutorCount >= 0) { + clusterChangedCallback.run(); + } + lastExecutorCount = executorCount; try { Thread.sleep(CLUSTER_INFO_UPDATE_INTERVAL_MS / 2); } catch (InterruptedException e) { @@ -78,10 +82,6 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager { clusterStateUpdateThread.interrupt(); // Don't wait for the thread. } - public void initClusterInfo() { - clusterState.initClusterInfo(); - } - @VisibleForTesting protected int getExecutorCount(boolean allowUpdate) { if (allowUpdate && !clusterState.initClusterInfo()) { @@ -191,4 +191,9 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager { endpointVersion = version; } } + + @Override + public void setClusterChangedCallback(Runnable clusterChangedCallback) { + this.clusterChangedCallback = clusterChangedCallback; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java index acacfd0..f1da17b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java @@ -19,6 +19,10 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.util.List; +/** + * Represents the mapping from logical resource allocations to queries from WM, to actual physical + * allocations performed using some implementation of a scheduler. + */ interface QueryAllocationManager { void start(); void stop(); @@ -30,4 +34,9 @@ interface QueryAllocationManager { * @param sessions Sessions to update based on their allocation fraction. */ void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions); + + /** + * Sets a callback to be invoked on cluster changes relevant to resource allocation. + */ + void setClusterChangedCallback(Runnable clusterChangedCallback); } http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 388a4f4..25a8ff2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -190,6 +190,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida this.conf = conf; this.totalQueryParallelism = determineQueryParallelism(plan); this.initRpFuture = this.updateResourcePlanAsync(plan); + this.allocationManager = qam; + this.allocationManager.setClusterChangedCallback(() -> notifyOfClusterStateChange()); + this.amComm = amComm; if (this.amComm != null) { this.amComm.init(conf); @@ -201,7 +204,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true, oldSession -> createSession(oldSession == null ? null : oldSession.getConf())); restrictedConfig = new RestrictedConfigChecker(conf); - allocationManager = qam; // Only creates the expiration tracker if expiration is configured. expirationTracker = SessionExpirationTracker.create(conf, this); @@ -1294,8 +1296,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } - // TODO: use this - public void nofityOfClusterStateChange() { + public void notifyOfClusterStateChange() { currentLock.lock(); try { current.hasClusterStateChanged = true; http://git-wip-us.apache.org/repos/asf/hive/blob/4ccea29b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 4cb9172..bceb31d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -109,10 +109,14 @@ public class TestWorkloadManager { isCalled = true; } - void assertWasCalled() { + void assertWasCalledAndReset() { assertTrue(isCalled); isCalled = false; } + + @Override + public void setClusterChangedCallback(Runnable clusterChangedCallback) { + } } public static WMResourcePlan plan() { @@ -157,6 +161,16 @@ public class TestWorkloadManager { super(null, yarnQueue, conf, qam, plan); } + @Override + public void notifyOfClusterStateChange() { + super.notifyOfClusterStateChange(); + try { + ensureWm(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + private static WMFullResourcePlan createDummyPlan(int numSessions) { WMFullResourcePlan plan = new WMFullResourcePlan(new WMResourcePlan("rp"), Lists.newArrayList(pool("llap", numSessions, 1.0f))); @@ -257,13 +271,13 @@ public class TestWorkloadManager { WmTezSession session = (WmTezSession) wm.getSession( null, new MappingInput("user", null), conf); assertEquals(1.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); + qam.assertWasCalledAndReset(); WmTezSession session2 = (WmTezSession) session.reopen(conf, null); assertNotSame(session, session2); wm.addTestEvent().get(); assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); + qam.assertWasCalledAndReset(); } @Test(timeout = 10000) @@ -275,23 +289,23 @@ public class TestWorkloadManager { wm.start(); WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); assertEquals(1.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); + qam.assertWasCalledAndReset(); WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); assertEquals(0.5, session.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); - qam.assertWasCalled(); + qam.assertWasCalledAndReset(); assertNotSame(session, session2); session.destroy(); // Destroy before returning to the pool. assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); + qam.assertWasCalledAndReset(); // We never lose pool session, so we should still be able to get. session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf); session.returnToSessionManager(); assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); + qam.assertWasCalledAndReset(); } @Test(timeout = 10000) @@ -404,6 +418,30 @@ public class TestWorkloadManager { } @Test(timeout=10000) + public void testClusterChange() throws Exception { + final HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(pool("A", 2, 1f))); + plan.getPlan().setDefaultPoolPath("A"); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf), + session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf); + assertEquals(0.5, session1.getClusterFraction(), EPSILON); + assertEquals(0.5, session2.getClusterFraction(), EPSILON); + qam.assertWasCalledAndReset(); + + // If cluster info changes, qam should be called with the same fractions. + wm.notifyOfClusterStateChange(); + assertEquals(0.5, session1.getClusterFraction(), EPSILON); + assertEquals(0.5, session2.getClusterFraction(), EPSILON); + qam.assertWasCalledAndReset(); + + session1.returnToSessionManager(); + session2.returnToSessionManager(); + } + + @Test(timeout=10000) public void testReuseWithQueueing() throws Exception { final HiveConf conf = createConf(); MockQam qam = new MockQam();