[NO ISSUE][CLUS] Complete NC Registration When Response is Received

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Mark NC registration as completed when the regisration
  reponse is received from CC.
- Send NC startup tasks request to CC after all previous
  tasks complete.

Change-Id: I4ff41f86a11b52cae894fe40ffa0353f2fb52138
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2525
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Murtadha Hubail <mhub...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/042353dc
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/042353dc
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/042353dc

Branch: refs/heads/release-0.9.4-pre-rc
Commit: 042353dc8d4fa971bef86438776a40d1bac2e900
Parents: d71cb24
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Mon Mar 26 09:09:27 2018 +0300
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Mon Mar 26 09:20:13 2018 -0700

----------------------------------------------------------------------
 .../hyracks/bootstrap/NCApplication.java        |  1 -
 .../apache/hyracks/api/util/ExceptionUtils.java | 12 ++++
 .../control/nc/NodeControllerService.java       | 12 ++--
 .../nc/work/EnsureAllCcTasksCompleted.java      | 72 +++++++++-----------
 4 files changed, 51 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/042353dc/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 57d080e..494198b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -211,7 +211,6 @@ public class NCApplication extends BaseNCApplication {
                 ? getCurrentSystemState() : SystemState.HEALTHY;
         RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) 
ncServiceCtx.getControllerService(),
                 currentStatus, systemState);
-        ncs.notifyRegistrationCompleted(ccId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/042353dc/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
index 444b08f..9302f46 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
@@ -107,4 +107,16 @@ public class ExceptionUtils {
         }
         return first;
     }
+
+    /**
+     * Returns a throwable containing {@code thread} stacktrace
+     *
+     * @param thread
+     * @return The throwable with {@code thread} stacktrace
+     */
+    public static Throwable fromThreadStack(Thread thread) {
+        final Throwable stackThrowable = new Throwable(thread.getName() + " 
Stack trace");
+        stackThrowable.setStackTrace(thread.getStackTrace());
+        return stackThrowable;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/042353dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 6d54843..a74a1ab 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -441,6 +441,8 @@ public class NodeControllerService implements 
IControllerService {
             ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, 
nodeParameters.getProfileDumpPeriod());
             ccTimers.put(ccId, ccTimer);
         }
+        ccc.notifyRegistrationCompleted();
+        LOGGER.info("Registering with Cluster Controller {} completed", ccc);
         return ccId;
     }
 
@@ -661,6 +663,10 @@ public class NodeControllerService implements 
IControllerService {
         return messagingNetManager;
     }
 
+    public void notifyTasksCompleted(CcId ccId) throws Exception {
+        application.onRegisterNode(ccId);
+    }
+
     private static INCApplication getApplication(NCConfig config)
             throws ClassNotFoundException, IllegalAccessException, 
InstantiationException {
         if (config.getAppClass() != null) {
@@ -723,10 +729,4 @@ public class NodeControllerService implements 
IControllerService {
     public INCApplication getApplication() {
         return application;
     }
-
-    public void notifyRegistrationCompleted(CcId ccId) {
-        CcConnection ccc = getCcConnection(ccId);
-        ccc.notifyRegistrationCompleted();
-        LOGGER.info("Registering with Cluster Controller {} complete", ccc);
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/042353dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
index 156a5c9..5964c04 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -18,84 +18,78 @@
  */
 package org.apache.hyracks.control.nc.work;
 
-import java.util.Arrays;
 import java.util.Deque;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.Span;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-@SuppressWarnings({ "squid:S1181", "squid:S1166" })
+@SuppressWarnings("squid:S1181")
 public class EnsureAllCcTasksCompleted implements Runnable {
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2);
     private final NodeControllerService ncs;
     private final CcId ccId;
-    private final Deque<Task> abortedTasks;
-    private final Span span;
+    private final Deque<Task> runningTasks;
 
-    public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, 
Deque<Task> abortedTasks) {
+    public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, 
Deque<Task> runningTasks) {
         this.ncs = ncs;
         this.ccId = ccId;
-        this.abortedTasks = abortedTasks;
-        span = Span.start(2, TimeUnit.MINUTES);
+        this.runningTasks = runningTasks;
     }
 
     @Override
     public void run() {
         try {
-            LOGGER.log(Level.INFO, "Ensuring all tasks of {} have completed", 
ccId);
-            while (!span.elapsed()) {
-                removeAborted();
-                if (abortedTasks.isEmpty()) {
+            LOGGER.info("Ensuring all tasks of CC {} have completed", ccId);
+            final Span maxWaitTime = Span.start(2, TimeUnit.MINUTES);
+            while (!maxWaitTime.elapsed()) {
+                removeCompleted();
+                if (runningTasks.isEmpty()) {
                     break;
                 }
-                LOGGER.log(Level.INFO, "{} tasks are still running", 
abortedTasks.size());
-                Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Check once a 
second
+                LOGGER.info("{} tasks are still running", runningTasks.size());
+                TimeUnit.SECONDS.sleep(1); // Check once a second
             }
-            if (abortedTasks.isEmpty()) {
-                LOGGER.log(Level.INFO, "All tasks of {} have completed, 
Completing registration", ccId);
-                // all tasks has completed
-                ncs.getApplication().onRegisterNode(ccId);
+            if (runningTasks.isEmpty()) {
+                LOGGER.info("All tasks of CC {} have completed", ccId);
+                ncs.notifyTasksCompleted(ccId);
             } else {
-                LOGGER.log(Level.ERROR,
-                        "Failed to abort all previous tasks associated with CC 
{} after {}ms. Giving up", ccId,
-                        TIMEOUT);
-                LOGGER.log(Level.ERROR, "{} tasks failed to complete within 
timeout", abortedTasks.size());
-                abortedTasks.forEach(task -> {
-                    List<Thread> pendingThreads = task.getPendingThreads();
-                    LOGGER.log(Level.ERROR, "task {} was stuck. Stuck thread 
count = {}", task.getTaskAttemptId(),
-                            pendingThreads.size());
-                    pendingThreads.forEach(thread -> {
-                        LOGGER.log(Level.ERROR, "Stuck thread trace: {}", 
Arrays.toString(thread.getStackTrace()));
-                    });
-                });
+                LOGGER.error("{} tasks associated with CC {} failed to 
complete after {}ms. Giving up",
+                        runningTasks.size(), ccId, TIMEOUT);
+                logPendingTasks();
                 ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
             }
         } catch (Throwable th) {
-            try {
-                LOGGER.log(Level.ERROR, "Failed to abort all previous tasks 
associated with CC {}", ccId, th);
-            } catch (Throwable ignore) {
-                // Ignore logging errors
-            }
+            LOGGER.error("Failed to abort all previous tasks associated with 
CC {}", ccId, th);
             ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
         }
     }
 
-    private void removeAborted() {
-        int numTasks = abortedTasks.size();
+    private void removeCompleted() {
+        final int numTasks = runningTasks.size();
         for (int i = 0; i < numTasks; i++) {
-            Task task = abortedTasks.poll();
+            Task task = runningTasks.poll();
             if (!task.isCompleted()) {
-                abortedTasks.add(task);
+                runningTasks.add(task);
+            }
+        }
+    }
+
+    private void logPendingTasks() {
+        for (Task task : runningTasks) {
+            final List<Thread> pendingThreads = task.getPendingThreads();
+            LOGGER.error("task {} was stuck. Stuck thread count = {}", 
task.getTaskAttemptId(), pendingThreads.size());
+            for (Thread thread : pendingThreads) {
+                LOGGER.error("Stuck thread trace", 
ExceptionUtils.fromThreadStack(thread));
             }
         }
     }

Reply via email to