sankarh commented on a change in pull request #2065:
URL: https://github.com/apache/hive/pull/2065#discussion_r597613584



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -363,6 +352,21 @@ public MoveSession(final WmTezSession srcSession, final 
String destPool) {
     public String toString() {
       return srcSession.getSessionId() + " moving from " + 
srcSession.getPoolName() + " to " + destPool;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o==this) return true;

Review comment:
       nit: 
   1. Need space before and after binary operators. 
   2. Even single statement block should be put under {}.
   3. Space after keywords such as "if"

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)
+          return;
+        int queueSize = pool.queue.size();
+        int remainingCapacity = pool.queryParallelism - 
pool.getTotalActiveSessions();
+        int delayedMovesToProcess = queueSize > remainingCapacity ? queueSize 
- remainingCapacity : 0;

Review comment:
       nit: Use () to clearly mark the boundary. 
   int delayedMovesToProcess = (queueSize > remainingCapacity) ? (queueSize - 
remainingCapacity) : 0;
   
   if ((delayedMovesToProcess > 0) && (pool.delayedMoveSessions.size() > 0)) {

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -794,17 +828,17 @@ private void handleMoveSessionOnMasterThread(final 
MoveSession moveSession,
     final WmThreadSyncWork syncWork,
     final HashSet<String> poolsToRedistribute,
     final Map<WmTezSession, GetRequest> toReuse,
-    final Map<WmTezSession, WmEvent> recordMoveEvents) {
+    final Map<WmTezSession, WmEvent> recordMoveEvents, final boolean 
moveImmediately) {
     String destPoolName = moveSession.destPool;
-    LOG.info("Handling move session event: {}", moveSession);
+    LOG.info("Handling move session event: {}, move immediately: {}", 
moveSession, moveImmediately);
     if (validMove(moveSession.srcSession, destPoolName)) {
       WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
-      // remove from src pool
-      RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+      // check if there is capacity in dest pool
+      if (capacityAvailable(destPoolName)) {

Review comment:
       We shouldn't change the sequence of validation here. Modify the flow as 
follows.
   ```
   RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
   if (rr == RemoveSessionResult.OK) {
     if (capacityAvailable(destPoolName)) {
       // Existing code to move the session
     } else {
       if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE) && 
!moveImmediately) {
         // new code to move it to delayed sessions list
       } else {
         // Existing code to kill the query
       }
     }
   } else {
     // Existing code to log error msg.
   }
   ```

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -32,18 +32,7 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;

Review comment:
       Expand the imports instead of using *.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)

Review comment:
       nit: Need {}

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -641,7 +645,7 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     // as possible
     Map<WmTezSession, WmEvent> recordMoveEvents = new HashMap<>();
     for (MoveSession moveSession : e.moveSessions) {
-      handleMoveSessionOnMasterThread(moveSession, syncWork, 
poolsToRedistribute, e.toReuse, recordMoveEvents);
+      handleMoveSessionOnMasterThread(moveSession, syncWork, 
poolsToRedistribute, e.toReuse, recordMoveEvents, false);

Review comment:
       The last argument can be set based on new config instead of passing 
false here and then take true flow if the config =false.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)
+          return;
+        int queueSize = pool.queue.size();
+        int remainingCapacity = pool.queryParallelism - 
pool.getTotalActiveSessions();
+        int delayedMovesToProcess = queueSize > remainingCapacity ? queueSize 
- remainingCapacity : 0;
+        if (delayedMovesToProcess > 0 && pool.delayedMoveSessions.size() > 0) {
+          int i = 0;
+          Iterator<MoveSession> itr = pool.delayedMoveSessions.iterator();
+          while (i < delayedMovesToProcess && itr.hasNext()) {
+            MoveSession moveSession = itr.next();
+            itr.remove();
+            WmTezSession srcSession = moveSession.srcSession;
+            if (pool.sessions.contains(srcSession)) {
+              LOG.info("Processing delayed move {} for pool {} in wm main 
thread as the pool has queued requests",
+                  moveSession, poolName);
+              i++;
+              handleMoveSessionOnMasterThread(moveSession, syncWork, 
poolsToRedistribute, e.toReuse, recordMoveEvents,
+                  true);
+            } else

Review comment:
       Shall remove else block. It is redundant.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -816,18 +850,34 @@ private void handleMoveSessionOnMasterThread(final 
MoveSession moveSession,
             LOG.error("Failed to move session: {}. Session is not added to 
destination.", moveSession);
           }
         } else {
-          WmTezSession session = moveSession.srcSession;
-          KillQueryContext killQueryContext = new KillQueryContext(session, 
"Destination pool " + destPoolName +
-            " is full. Killing query.");
-          resetAndQueueKill(syncWork.toKillQuery, killQueryContext, toReuse);
+        LOG.error("Failed to move session: {}. Session is not removed from its 
pool.", moveSession);

Review comment:
       nit: Misaligned statement. Add 2 spaces.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)
+          return;
+        int queueSize = pool.queue.size();
+        int remainingCapacity = pool.queryParallelism - 
pool.getTotalActiveSessions();
+        int delayedMovesToProcess = queueSize > remainingCapacity ? queueSize 
- remainingCapacity : 0;
+        if (delayedMovesToProcess > 0 && pool.delayedMoveSessions.size() > 0) {
+          int i = 0;
+          Iterator<MoveSession> itr = pool.delayedMoveSessions.iterator();
+          while (i < delayedMovesToProcess && itr.hasNext()) {

Review comment:
       i < delayedMovesToProcess can be moved inside the loop to avoid 
redundant checks if i is not incremented.
   
   i++;
   handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, 
e.toReuse, recordMoveEvents,
                     true);
   if (i >= delayedMovesToProcess) {
     break;
   }

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -794,17 +828,17 @@ private void handleMoveSessionOnMasterThread(final 
MoveSession moveSession,
     final WmThreadSyncWork syncWork,
     final HashSet<String> poolsToRedistribute,
     final Map<WmTezSession, GetRequest> toReuse,
-    final Map<WmTezSession, WmEvent> recordMoveEvents) {
+    final Map<WmTezSession, WmEvent> recordMoveEvents, final boolean 
moveImmediately) {

Review comment:
       To keep it uniform with the new config, can we rename the variable as 
"delayedMove" instead of "moveImmediately" and use it accordingly?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
##########
@@ -657,7 +661,37 @@ private void processCurrentEvents(EventState e, 
WmThreadSyncWork syncWork) throw
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can 
affect them.
+    // 9. If delayed move is set to true, for pools which have queued 
requests, process the "delayed moves" now.
+    // Incoming requests have more claim upon the pool than the delayed moves
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : poolsToRedistribute) {
+        PoolState pool = pools.get(poolName);
+        if (pool == null)
+          return;
+        int queueSize = pool.queue.size();
+        int remainingCapacity = pool.queryParallelism - 
pool.getTotalActiveSessions();
+        int delayedMovesToProcess = queueSize > remainingCapacity ? queueSize 
- remainingCapacity : 0;
+        if (delayedMovesToProcess > 0 && pool.delayedMoveSessions.size() > 0) {
+          int i = 0;

Review comment:
       nit: Shall use the name "movedCount" instead of "i".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to