sankarh commented on a change in pull request #2065:
URL: https://github.com/apache/hive/pull/2065#discussion_r620083627
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -3749,6 +3749,17 @@ private static void
populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting
to use the\n" +
"session), we kill it and try to get another one."),
+ HIVE_SERVER2_WM_DELAYED_MOVE("hive.server2.wm.delayed.move", false,
+ "Determines behavior of the wm move trigger when destination pool is
full.\n" +
+ "If true, the query will run in source pool as long as possible if
destination pool is full;\n" +
+ "if false, the query will be killed if destination pool is full."),
+
HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT("hive.server2.wm.delayed.move.timeout",
"600",
+ new TimeValidator(TimeUnit.SECONDS),
+ "The amount of time a delayed move is allowed to run in the source
pool,\n" +
+ "when a delayed move session times out, the session is moved to the
destination pool.\n"),
+
HIVE_SERVER2_WM_DELAYED_MOVE_VALIDATOR_INTERVAL("hive.server2.wm.delayed.move.validator.interval",
"10",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Interval for checking for expired delayed moves and retries. Value of
0 indicates no checks."),
Review comment:
Does "0" means no timeout check or no support of delayed move itself? I
think, in any case, this creates confusion. We shouldn't allow 0 and this
config should be > 0.
hive.server2.wm.delayed.move.timeout=0 can be used for no timeout case.
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -3749,6 +3749,17 @@ private static void
populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting
to use the\n" +
"session), we kill it and try to get another one."),
+ HIVE_SERVER2_WM_DELAYED_MOVE("hive.server2.wm.delayed.move", false,
+ "Determines behavior of the wm move trigger when destination pool is
full.\n" +
+ "If true, the query will run in source pool as long as possible if
destination pool is full;\n" +
+ "if false, the query will be killed if destination pool is full."),
+
HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT("hive.server2.wm.delayed.move.timeout",
"600",
+ new TimeValidator(TimeUnit.SECONDS),
+ "The amount of time a delayed move is allowed to run in the source
pool,\n" +
+ "when a delayed move session times out, the session is moved to the
destination pool.\n"),
Review comment:
If value 0 have special meaning such as "doesn't expire", then need to
capture it here.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -790,45 +842,72 @@ private void dumpPoolState(PoolState ps, List<String>
set) {
}
}
- private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
- final WmThreadSyncWork syncWork,
- final HashSet<String> poolsToRedistribute,
- final Map<WmTezSession, GetRequest> toReuse,
- final Map<WmTezSession, WmEvent> recordMoveEvents) {
+ private static enum MoveSessionResult {
+ OK, // Normal case - the session was moved.
+ KILLED, // Killed because destination pool was full and delayed move is
false.
+ CONVERTED_TO_DELAYED_MOVE, // the move session was added to the pool's
delayed moves as the dest. pool was full
+ // and delayed move is true.
+ ERROR
+ }
+
+ private MoveSessionResult handleMoveSessionOnMasterThread(final MoveSession
moveSession,
+ final WmThreadSyncWork syncWork,
+ final HashSet<String> poolsToRedistribute,
+ final Map<WmTezSession, GetRequest> toReuse,
+ final Map<WmTezSession, WmEvent> recordMoveEvents,
+ final boolean convertToDelayedMove) {
String destPoolName = moveSession.destPool;
- LOG.info("Handling move session event: {}", moveSession);
+ LOG.info("Handling move session event: {}, Convert to Delayed Move: {}",
moveSession, convertToDelayedMove);
if (validMove(moveSession.srcSession, destPoolName)) {
+ String srcPoolName = moveSession.srcSession.getPoolName();
+ PoolState srcPool = pools.get(srcPoolName);
+ boolean capacityAvailableInDest = capacityAvailable(destPoolName);
+ // If delayed move is set to true and if destination pool doesn't have
enough capacity, don't kill the query.
+ // Let the query run in source pool. Add the session to the source
pool's delayed move sessions.
+ if (convertToDelayedMove && !capacityAvailableInDest) {
+ srcPool.delayedMoveSessions.add(moveSession);
+ moveSession.srcSession.setDelayedMove(true);
Review comment:
What happens when we de-activate WLM resource plan and have sessions in
delayed move list?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -1248,6 +1334,42 @@ private void processPoolChangesOnMasterThread(
}
}
+ private void processDelayedMovesForPool(final String poolName, final
HashSet<String> poolsToRedistribute, final Map<WmTezSession, WmEvent>
recordMoveEvents,
+ WmThreadSyncWork syncWork, IdentityHashMap<WmTezSession, GetRequest>
toReuse) {
+ long currentTime = System.currentTimeMillis();
+ PoolState pool = pools.get(poolName);
+ int movedCount = 0;
+ int queueSize = pool.queue.size();
+ int remainingCapacity = pool.queryParallelism -
pool.getTotalActiveSessions();
+ int delayedMovesToProcess = (queueSize > remainingCapacity) ? (queueSize -
remainingCapacity) : 0;
+ Iterator<MoveSession> iter = pool.delayedMoveSessions.iterator();
+ while (iter.hasNext()) {
+ MoveSession moveSession = iter.next();
+ MoveSessionResult result;
+ //Discard the delayed move if invalid
+ if (!validDelayedMove(moveSession, pool, poolName)) {
+ iter.remove();
+ continue;
+ }
+ // Process the delayed move if
Review comment:
nit: Add a blank line before the comments.
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -3749,6 +3749,17 @@ private static void
populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting
to use the\n" +
"session), we kill it and try to get another one."),
+ HIVE_SERVER2_WM_DELAYED_MOVE("hive.server2.wm.delayed.move", false,
+ "Determines behavior of the wm move trigger when destination pool is
full.\n" +
+ "If true, the query will run in source pool as long as possible if
destination pool is full;\n" +
+ "if false, the query will be killed if destination pool is full."),
+
HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT("hive.server2.wm.delayed.move.timeout",
"600",
Review comment:
The default can be round figure of 1 hour. You can set it as 360.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -790,45 +842,72 @@ private void dumpPoolState(PoolState ps, List<String>
set) {
}
}
- private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
- final WmThreadSyncWork syncWork,
- final HashSet<String> poolsToRedistribute,
- final Map<WmTezSession, GetRequest> toReuse,
- final Map<WmTezSession, WmEvent> recordMoveEvents) {
+ private static enum MoveSessionResult {
+ OK, // Normal case - the session was moved.
+ KILLED, // Killed because destination pool was full and delayed move is
false.
+ CONVERTED_TO_DELAYED_MOVE, // the move session was added to the pool's
delayed moves as the dest. pool was full
+ // and delayed move is true.
+ ERROR
+ }
+
+ private MoveSessionResult handleMoveSessionOnMasterThread(final MoveSession
moveSession,
+ final WmThreadSyncWork syncWork,
+ final HashSet<String> poolsToRedistribute,
+ final Map<WmTezSession, GetRequest> toReuse,
+ final Map<WmTezSession, WmEvent> recordMoveEvents,
+ final boolean convertToDelayedMove) {
String destPoolName = moveSession.destPool;
- LOG.info("Handling move session event: {}", moveSession);
+ LOG.info("Handling move session event: {}, Convert to Delayed Move: {}",
moveSession, convertToDelayedMove);
if (validMove(moveSession.srcSession, destPoolName)) {
+ String srcPoolName = moveSession.srcSession.getPoolName();
+ PoolState srcPool = pools.get(srcPoolName);
+ boolean capacityAvailableInDest = capacityAvailable(destPoolName);
+ // If delayed move is set to true and if destination pool doesn't have
enough capacity, don't kill the query.
+ // Let the query run in source pool. Add the session to the source
pool's delayed move sessions.
+ if (convertToDelayedMove && !capacityAvailableInDest) {
+ srcPool.delayedMoveSessions.add(moveSession);
+ moveSession.srcSession.setDelayedMove(true);
+ LOG.info("Converting Move: {} to a delayed move. Since destination
pool {} is full, running in source pool {}"
+ + "as long as possible.", moveSession, destPoolName, srcPoolName);
Review comment:
Add a space before "as" or else pool name looks incorrect.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -421,6 +446,22 @@ private void runWmThread() {
}
}
+ private void runDelayedMoveThread() {
+ while (true) {
Review comment:
Add an info log here with delayedMoveValidationIntervalMs.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -641,7 +682,11 @@ private void processCurrentEvents(EventState e,
WmThreadSyncWork syncWork) throw
// as possible
Map<WmTezSession, WmEvent> recordMoveEvents = new HashMap<>();
for (MoveSession moveSession : e.moveSessions) {
- handleMoveSessionOnMasterThread(moveSession, syncWork,
poolsToRedistribute, e.toReuse, recordMoveEvents);
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
Review comment:
Instead of "if-else", we can directly pass the value of
HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE) as last
argument.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -1248,6 +1334,42 @@ private void processPoolChangesOnMasterThread(
}
}
+ private void processDelayedMovesForPool(final String poolName, final
HashSet<String> poolsToRedistribute, final Map<WmTezSession, WmEvent>
recordMoveEvents,
+ WmThreadSyncWork syncWork, IdentityHashMap<WmTezSession, GetRequest>
toReuse) {
+ long currentTime = System.currentTimeMillis();
+ PoolState pool = pools.get(poolName);
+ int movedCount = 0;
+ int queueSize = pool.queue.size();
+ int remainingCapacity = pool.queryParallelism -
pool.getTotalActiveSessions();
+ int delayedMovesToProcess = (queueSize > remainingCapacity) ? (queueSize -
remainingCapacity) : 0;
+ Iterator<MoveSession> iter = pool.delayedMoveSessions.iterator();
+ while (iter.hasNext()) {
+ MoveSession moveSession = iter.next();
+ MoveSessionResult result;
+ //Discard the delayed move if invalid
+ if (!validDelayedMove(moveSession, pool, poolName)) {
+ iter.remove();
+ continue;
+ }
+ // Process the delayed move if
+ // 1. The delayed move has timed out or
+ // 2. The destination pool has freed up or
+ // 3. If the source pool has incoming requests and we need to free up
capacity in the source pool
+ // to accommodate these requests.
+ if (((currentTime - moveSession.startTime) >= delayedMoveTimeOutMs) ||
(capacityAvailable(moveSession.destPool))
+ || (movedCount < delayedMovesToProcess)) {
+ LOG.info("Processing delayed move {} for pool {}", moveSession,
poolName);
+ result = handleMoveSessionOnMasterThread(moveSession, syncWork,
poolsToRedistribute, toReuse, recordMoveEvents,
+ false);
+ iter.remove();
+ if (result == MoveSessionResult.OK) {
Review comment:
I think movedCount should be incremented for both moved or killed
sessions.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -1248,6 +1334,42 @@ private void processPoolChangesOnMasterThread(
}
}
+ private void processDelayedMovesForPool(final String poolName, final
HashSet<String> poolsToRedistribute, final Map<WmTezSession, WmEvent>
recordMoveEvents,
+ WmThreadSyncWork syncWork, IdentityHashMap<WmTezSession, GetRequest>
toReuse) {
+ long currentTime = System.currentTimeMillis();
+ PoolState pool = pools.get(poolName);
+ int movedCount = 0;
+ int queueSize = pool.queue.size();
+ int remainingCapacity = pool.queryParallelism -
pool.getTotalActiveSessions();
Review comment:
Will queuing a query to given pool wake up master thread? If not, then
we have issues where a query might wait in worst case 10 secs to process
delayed move.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -1248,6 +1334,42 @@ private void processPoolChangesOnMasterThread(
}
}
+ private void processDelayedMovesForPool(final String poolName, final
HashSet<String> poolsToRedistribute, final Map<WmTezSession, WmEvent>
recordMoveEvents,
+ WmThreadSyncWork syncWork, IdentityHashMap<WmTezSession, GetRequest>
toReuse) {
+ long currentTime = System.currentTimeMillis();
+ PoolState pool = pools.get(poolName);
+ int movedCount = 0;
+ int queueSize = pool.queue.size();
+ int remainingCapacity = pool.queryParallelism -
pool.getTotalActiveSessions();
+ int delayedMovesToProcess = (queueSize > remainingCapacity) ? (queueSize -
remainingCapacity) : 0;
+ Iterator<MoveSession> iter = pool.delayedMoveSessions.iterator();
+ while (iter.hasNext()) {
+ MoveSession moveSession = iter.next();
+ MoveSessionResult result;
+ //Discard the delayed move if invalid
+ if (!validDelayedMove(moveSession, pool, poolName)) {
+ iter.remove();
+ continue;
+ }
+ // Process the delayed move if
+ // 1. The delayed move has timed out or
+ // 2. The destination pool has freed up or
+ // 3. If the source pool has incoming requests and we need to free up
capacity in the source pool
+ // to accommodate these requests.
+ if (((currentTime - moveSession.startTime) >= delayedMoveTimeOutMs) ||
(capacityAvailable(moveSession.destPool))
Review comment:
We can check (movedCount < delayedMovesToProcess) first, next
capacityAvailable and last timeout as it is likely case and can save few cpu
cycles.
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
##########
@@ -47,8 +47,10 @@ public void applyAction(final Map<WmTezSession, Trigger>
queriesViolated) {
break;
case MOVE_TO_POOL:
String destPoolName = entry.getValue().getAction().getPoolName();
- Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession,
destPoolName);
- moveFutures.put(wmTezSession, moveFuture);
+ if (!wmTezSession.isDelayedMove()) {
+ Future<Boolean> moveFuture =
wm.applyMoveSessionAsync(wmTezSession, destPoolName);
Review comment:
How do we ensure, when we move current session to dest pool, we first
check any delayed moves queued up for given dest pool before we process this
one?
I think, we should perform Step-9 before Step-8 to give higher priority to
delayed moves.
##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -3749,6 +3749,17 @@ private static void
populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting
to use the\n" +
"session), we kill it and try to get another one."),
+ HIVE_SERVER2_WM_DELAYED_MOVE("hive.server2.wm.delayed.move", false,
+ "Determines behavior of the wm move trigger when destination pool is
full.\n" +
+ "If true, the query will run in source pool as long as possible if
destination pool is full;\n" +
+ "if false, the query will be killed if destination pool is full."),
+
HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT("hive.server2.wm.delayed.move.timeout",
"600",
+ new TimeValidator(TimeUnit.SECONDS),
+ "The amount of time a delayed move is allowed to run in the source
pool,\n" +
+ "when a delayed move session times out, the session is moved to the
destination pool.\n"),
+
HIVE_SERVER2_WM_DELAYED_MOVE_VALIDATOR_INTERVAL("hive.server2.wm.delayed.move.validator.interval",
"10",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Interval for checking for expired delayed moves and retries. Value of
0 indicates no checks."),
Review comment:
I think, retry should happen always at regular interval but timeout can
be enabled only if hive.server2.wm.delayed.move.timeout>0.
##########
File path:
ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
##########
@@ -1110,6 +1110,94 @@ public void testMoveSessionsMultiPool() throws Exception
{
assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1));
}
+ @Test(timeout=10000)
+ public void testDelayedMoveSessions() throws Exception {
+ final HiveConf conf = createConfForDelayedMove();
+ MockQam qam = new MockQam();
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
Lists.newArrayList(
+ pool("A", 2, 0.6f), pool("B", 1, 0.4f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
+ final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam,
plan);
+ wm.start();
+
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null,
mappingInput("A"), conf);
+
+ // [A: 1, B: 0]
+ Map<String, SessionTriggerProvider> allSessionProviders =
wm.getAllSessionTriggerProviders();
+ assertEquals(1, allSessionProviders.get("A").getSessions().size());
+ assertEquals(0, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA1));
+
assertFalse(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.6f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA1.getPoolName());
+
+ // If dest pool has capacity, move immediately
+ // [A: 0, B: 1]
+ Future<Boolean> future = wm.applyMoveSessionAsync(sessionA1, "B");
+ assertNotNull(future.get());
+ assertTrue(future.get());
+ wm.addTestEvent().get();
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(0, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+
assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("B", sessionA1.getPoolName());
+
+ WmTezSession sessionA2 = (WmTezSession) wm.getSession(null,
mappingInput("A"), conf);
+ // [A: 1, B: 1]
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(1, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON);
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA2.getPoolName());
+ assertEquals("B", sessionA1.getPoolName());
+
+ // Dest pool is maxed out. Keep running in source pool
+ // [A: 1, B: 1]
+ future = wm.applyMoveSessionAsync(sessionA2, "B");
+ assertNotNull(future.get());
+ assertFalse(future.get());
+ wm.addTestEvent().get();
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(1, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON);
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA2.getPoolName());
+ assertEquals("B", sessionA1.getPoolName());
+
+ // A has queued requests. The new requests should get accepted. The
delayed move should be killed
+ WmTezSession sessionA3 = (WmTezSession) wm.getSession(null,
mappingInput("A"), conf);
+ WmTezSession sessionA4 = (WmTezSession) wm.getSession(null,
mappingInput("A"), conf);
+
+ while(sessionA2.isOpen()) {
+ Thread.sleep(100);
+ }
+ assertNull(sessionA2.getPoolName());
+ assertEquals("Destination pool B is full. Killing query.",
sessionA2.getReasonForKill());
+
+ // [A: 2, B: 1]
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(2, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3));
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA4));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
+ assertEquals(0.3f, sessionA4.getClusterFraction(), EPSILON);
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA3.getPoolName());
+ assertEquals("A", sessionA4.getPoolName());
+ assertEquals("B", sessionA1.getPoolName());
Review comment:
Can we add test to verify the timeout/retry for capacity_available cases?
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
##########
@@ -47,8 +47,10 @@ public void applyAction(final Map<WmTezSession, Trigger>
queriesViolated) {
break;
case MOVE_TO_POOL:
String destPoolName = entry.getValue().getAction().getPoolName();
- Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession,
destPoolName);
- moveFutures.put(wmTezSession, moveFuture);
+ if (!wmTezSession.isDelayedMove()) {
Review comment:
Will it add overhead in trigger validator thread if bunch of queries
scheduled for delayed move? I mean, it generates trigger events continuously
even though we no-op it here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]