[NO ISSUE][CLUS] Complete NC Registration When Response is Received - user model changes: no - storage format changes: no - interface changes: no
Details: - Mark NC registration as completed when the regisration reponse is received from CC. - Send NC startup tasks request to CC after all previous tasks complete. Change-Id: I4ff41f86a11b52cae894fe40ffa0353f2fb52138 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2525 Reviewed-by: Michael Blow <mb...@apache.org> Tested-by: Murtadha Hubail <mhub...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/042353dc Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/042353dc Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/042353dc Branch: refs/heads/release-0.9.4-pre-rc Commit: 042353dc8d4fa971bef86438776a40d1bac2e900 Parents: d71cb24 Author: Murtadha Hubail <mhub...@apache.org> Authored: Mon Mar 26 09:09:27 2018 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Mon Mar 26 09:20:13 2018 -0700 ---------------------------------------------------------------------- .../hyracks/bootstrap/NCApplication.java | 1 - .../apache/hyracks/api/util/ExceptionUtils.java | 12 ++++ .../control/nc/NodeControllerService.java | 12 ++-- .../nc/work/EnsureAllCcTasksCompleted.java | 72 +++++++++----------- 4 files changed, 51 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/042353dc/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 57d080e..494198b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -211,7 +211,6 @@ public class NCApplication extends BaseNCApplication { ? getCurrentSystemState() : SystemState.HEALTHY; RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(), currentStatus, systemState); - ncs.notifyRegistrationCompleted(ccId); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/042353dc/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java index 444b08f..9302f46 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java @@ -107,4 +107,16 @@ public class ExceptionUtils { } return first; } + + /** + * Returns a throwable containing {@code thread} stacktrace + * + * @param thread + * @return The throwable with {@code thread} stacktrace + */ + public static Throwable fromThreadStack(Thread thread) { + final Throwable stackThrowable = new Throwable(thread.getName() + " Stack trace"); + stackThrowable.setStackTrace(thread.getStackTrace()); + return stackThrowable; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/042353dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 6d54843..a74a1ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -441,6 +441,8 @@ public class NodeControllerService implements IControllerService { ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod()); ccTimers.put(ccId, ccTimer); } + ccc.notifyRegistrationCompleted(); + LOGGER.info("Registering with Cluster Controller {} completed", ccc); return ccId; } @@ -661,6 +663,10 @@ public class NodeControllerService implements IControllerService { return messagingNetManager; } + public void notifyTasksCompleted(CcId ccId) throws Exception { + application.onRegisterNode(ccId); + } + private static INCApplication getApplication(NCConfig config) throws ClassNotFoundException, IllegalAccessException, InstantiationException { if (config.getAppClass() != null) { @@ -723,10 +729,4 @@ public class NodeControllerService implements IControllerService { public INCApplication getApplication() { return application; } - - public void notifyRegistrationCompleted(CcId ccId) { - CcConnection ccc = getCcConnection(ccId); - ccc.notifyRegistrationCompleted(); - LOGGER.info("Registering with Cluster Controller {} complete", ccc); - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/042353dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java index 156a5c9..5964c04 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java @@ -18,84 +18,78 @@ */ package org.apache.hyracks.control.nc.work; -import java.util.Arrays; import java.util.Deque; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; import org.apache.hyracks.util.ExitUtil; import org.apache.hyracks.util.Span; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -@SuppressWarnings({ "squid:S1181", "squid:S1166" }) +@SuppressWarnings("squid:S1181") public class EnsureAllCcTasksCompleted implements Runnable { private static final Logger LOGGER = LogManager.getLogger(); private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2); private final NodeControllerService ncs; private final CcId ccId; - private final Deque<Task> abortedTasks; - private final Span span; + private final Deque<Task> runningTasks; - public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task> abortedTasks) { + public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task> runningTasks) { this.ncs = ncs; this.ccId = ccId; - this.abortedTasks = abortedTasks; - span = Span.start(2, TimeUnit.MINUTES); + this.runningTasks = runningTasks; } @Override public void run() { try { - LOGGER.log(Level.INFO, "Ensuring all tasks of {} have completed", ccId); - while (!span.elapsed()) { - removeAborted(); - if (abortedTasks.isEmpty()) { + LOGGER.info("Ensuring all tasks of CC {} have completed", ccId); + final Span maxWaitTime = Span.start(2, TimeUnit.MINUTES); + while (!maxWaitTime.elapsed()) { + removeCompleted(); + if (runningTasks.isEmpty()) { break; } - LOGGER.log(Level.INFO, "{} tasks are still running", abortedTasks.size()); - Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Check once a second + LOGGER.info("{} tasks are still running", runningTasks.size()); + TimeUnit.SECONDS.sleep(1); // Check once a second } - if (abortedTasks.isEmpty()) { - LOGGER.log(Level.INFO, "All tasks of {} have completed, Completing registration", ccId); - // all tasks has completed - ncs.getApplication().onRegisterNode(ccId); + if (runningTasks.isEmpty()) { + LOGGER.info("All tasks of CC {} have completed", ccId); + ncs.notifyTasksCompleted(ccId); } else { - LOGGER.log(Level.ERROR, - "Failed to abort all previous tasks associated with CC {} after {}ms. Giving up", ccId, - TIMEOUT); - LOGGER.log(Level.ERROR, "{} tasks failed to complete within timeout", abortedTasks.size()); - abortedTasks.forEach(task -> { - List<Thread> pendingThreads = task.getPendingThreads(); - LOGGER.log(Level.ERROR, "task {} was stuck. Stuck thread count = {}", task.getTaskAttemptId(), - pendingThreads.size()); - pendingThreads.forEach(thread -> { - LOGGER.log(Level.ERROR, "Stuck thread trace: {}", Arrays.toString(thread.getStackTrace())); - }); - }); + LOGGER.error("{} tasks associated with CC {} failed to complete after {}ms. Giving up", + runningTasks.size(), ccId, TIMEOUT); + logPendingTasks(); ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); } } catch (Throwable th) { - try { - LOGGER.log(Level.ERROR, "Failed to abort all previous tasks associated with CC {}", ccId, th); - } catch (Throwable ignore) { - // Ignore logging errors - } + LOGGER.error("Failed to abort all previous tasks associated with CC {}", ccId, th); ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); } } - private void removeAborted() { - int numTasks = abortedTasks.size(); + private void removeCompleted() { + final int numTasks = runningTasks.size(); for (int i = 0; i < numTasks; i++) { - Task task = abortedTasks.poll(); + Task task = runningTasks.poll(); if (!task.isCompleted()) { - abortedTasks.add(task); + runningTasks.add(task); + } + } + } + + private void logPendingTasks() { + for (Task task : runningTasks) { + final List<Thread> pendingThreads = task.getPendingThreads(); + LOGGER.error("task {} was stuck. Stuck thread count = {}", task.getTaskAttemptId(), pendingThreads.size()); + for (Thread thread : pendingThreads) { + LOGGER.error("Stuck thread trace", ExceptionUtils.fromThreadStack(thread)); } } }