abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Ensure all NC tasks are aborted before joining ......................................................................
[NO ISSUE][RT] Ensure all NC tasks are aborted before joining - user model changes: no - storage format changes: no - interface changes: yes - Add getApplication() to NodeControllerService details: - This change ensures that all previous tasks of a CC on an NC are completed before completion of registration. Change-Id: I0517e5a390d50e8703ffdbecbb84467c22edda85 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2507 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java 8 files changed, 150 insertions(+), 27 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; ; Verified Michael Blow: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 494198b..57d080e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -211,6 +211,7 @@ ? getCurrentSystemState() : SystemState.HEALTHY; RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(), currentStatus, systemState); + ncs.notifyRegistrationCompleted(ccId); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index e1a36cd..b44a6bb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -103,14 +103,13 @@ if (nodeRegistry.containsKey(nodeId)) { LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering."); failNode(nodeId); - } else { - try { - // TODO(mblow): it seems we should close IPC handles when we're done with them (like here) - IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress()); - ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null); - } catch (IPCException e) { - throw HyracksDataException.create(e); - } + } + try { + // TODO(mblow): it seems we should close IPC handles when we're done with them (like here) + IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress()); + ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null); + } catch (IPCException e) { + throw HyracksDataException.create(e); } LOGGER.warn("adding node to registry"); nodeRegistry.put(nodeId, ncState); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 98f5c70..6d54843 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -363,8 +363,7 @@ }; ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy( ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener)); - CcConnection ccc = new CcConnection(ccProxy); - return registerNode(ccc, ccAddress); + return registerNode(new CcConnection(ccProxy), ccAddress); } } @@ -415,7 +414,6 @@ public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception { LOGGER.info("Registering with Cluster Controller {}", ccc); - int registrationId = nextRegistrationId.incrementAndGet(); pendingRegistrations.put(registrationId, ccc); CcId ccId = ccc.registerNode(nodeRegistration, registrationId); @@ -425,10 +423,8 @@ if (distributedState != null) { getDistributedState().put(ccId, distributedState); } - application.onRegisterNode(ccId); IClusterController ccs = ccc.getClusterControllerService(); NodeParameters nodeParameters = ccc.getNodeParameters(); - // Start heartbeat generator. if (!heartbeatThreads.containsKey(ccId)) { Thread heartbeatThread = new Thread( @@ -445,8 +441,6 @@ ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod()); ccTimers.put(ccId, ccTimer); } - ccc.notifyRegistrationCompleted(); - LOGGER.info("Registering with Cluster Controller {} complete", ccc); return ccId; } @@ -708,7 +702,6 @@ } private class TraceCurrentTimeTask extends TimerTask { - private ITracer tracer; private long traceCategory; @@ -726,4 +719,14 @@ } } } + + public INCApplication getApplication() { + return application; + } + + public void notifyRegistrationCompleted(CcId ccId) { + CcConnection ccc = getCcConnection(ccId); + ccc.notifyRegistrationCompleted(); + LOGGER.info("Registering with Cluster Controller {} complete", ccc); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 9d99968..d7b930c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; import java.util.LinkedHashSet; @@ -111,6 +112,8 @@ private final Set<JobFlag> jobFlags; private final IStatsCollector statsCollector; + + private volatile boolean completed = false; public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName, ExecutorService executor, NodeControllerService ncs, @@ -255,8 +258,11 @@ if (aborted) { return false; } - pendingThreads.add(t); - return true; + return pendingThreads.add(t); + } + + public synchronized List<Thread> getPendingThreads() { + return new ArrayList<>(pendingThreads); } private synchronized void removePendingThread(Thread t) { @@ -300,8 +306,6 @@ executorService.execute(() -> { try { Thread thread = Thread.currentThread(); - // Calls synchronized addPendingThread(..) to make sure that in the abort() method, - // the thread is not escaped from interruption. if (!addPendingThread(thread)) { return; } @@ -345,6 +349,7 @@ } finally { close(); removePendingThread(ct); + completed = true; } if (!exceptions.isEmpty()) { if (LOGGER.isWarnEnabled()) { @@ -460,4 +465,8 @@ public IStatsCollector getStatsCollector() { return statsCollector; } + + public boolean isCompleted() { + return completed; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java index 2bcf414..c6696fd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java @@ -18,7 +18,9 @@ */ package org.apache.hyracks.control.nc.work; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Deque; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; @@ -50,13 +52,14 @@ if (dpm == null) { LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId()); } + Deque<Task> abortedTasks = new ArrayDeque<>(); Collection<Joblet> joblets = ncs.getJobletMap().values(); // TODO(mblow): should we have one jobletmap per cc? joblets.stream().filter(joblet -> joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> { - Collection<Task> tasks = joblet.getTaskMap().values(); - for (Task task : tasks) { + joblet.getTaskMap().values().forEach(task -> { task.abort(); - } + abortedTasks.add(task); + }); final JobId jobId = joblet.getJobId(); if (dpm != null) { dpm.abortReader(jobId); @@ -64,5 +67,6 @@ } ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE)); }); + ncs.getExecutor().submit(new EnsureAllCcTasksCompleted(ncs, ccId, abortedTasks)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java new file mode 100644 index 0000000..156a5c9 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.work; + +import java.util.Arrays; +import java.util.Deque; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.Task; +import org.apache.hyracks.util.ExitUtil; +import org.apache.hyracks.util.Span; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +@SuppressWarnings({ "squid:S1181", "squid:S1166" }) +public class EnsureAllCcTasksCompleted implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2); + private final NodeControllerService ncs; + private final CcId ccId; + private final Deque<Task> abortedTasks; + private final Span span; + + public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task> abortedTasks) { + this.ncs = ncs; + this.ccId = ccId; + this.abortedTasks = abortedTasks; + span = Span.start(2, TimeUnit.MINUTES); + } + + @Override + public void run() { + try { + LOGGER.log(Level.INFO, "Ensuring all tasks of {} have completed", ccId); + while (!span.elapsed()) { + removeAborted(); + if (abortedTasks.isEmpty()) { + break; + } + LOGGER.log(Level.INFO, "{} tasks are still running", abortedTasks.size()); + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Check once a second + } + if (abortedTasks.isEmpty()) { + LOGGER.log(Level.INFO, "All tasks of {} have completed, Completing registration", ccId); + // all tasks has completed + ncs.getApplication().onRegisterNode(ccId); + } else { + LOGGER.log(Level.ERROR, + "Failed to abort all previous tasks associated with CC {} after {}ms. Giving up", ccId, + TIMEOUT); + LOGGER.log(Level.ERROR, "{} tasks failed to complete within timeout", abortedTasks.size()); + abortedTasks.forEach(task -> { + List<Thread> pendingThreads = task.getPendingThreads(); + LOGGER.log(Level.ERROR, "task {} was stuck. Stuck thread count = {}", task.getTaskAttemptId(), + pendingThreads.size()); + pendingThreads.forEach(thread -> { + LOGGER.log(Level.ERROR, "Stuck thread trace: {}", Arrays.toString(thread.getStackTrace())); + }); + }); + ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); + } + } catch (Throwable th) { + try { + LOGGER.log(Level.ERROR, "Failed to abort all previous tasks associated with CC {}", ccId, th); + } catch (Throwable ignore) { + // Ignore logging errors + } + ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS); + } + } + + private void removeAborted() { + int numTasks = abortedTasks.size(); + for (int i = 0; i < numTasks; i++) { + Task task = abortedTasks.poll(); + if (!task.isCompleted()) { + abortedTasks.add(task); + } + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index db5cd13..1a17012 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -33,6 +33,7 @@ public static final int EC_ABNORMAL_TERMINATION = 1; public static final int EC_FAILED_TO_STARTUP = 2; public static final int EC_FAILED_TO_RECOVER = 3; + public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4; public static final int EC_UNHANDLED_EXCEPTION = 11; public static final int EC_IMMEDIATE_HALT = 33; public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44; diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java index 95db604..d8d6bb1 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java @@ -34,15 +34,19 @@ } public boolean elapsed() { - return remaining(TimeUnit.NANOSECONDS) > spanNanos; + return elapsed(TimeUnit.NANOSECONDS) > spanNanos; } - public long remaining(TimeUnit unit) { + public long elapsed(TimeUnit unit) { return unit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS); } public void sleep(long sleep, TimeUnit unit) throws InterruptedException { - TimeUnit.NANOSECONDS.sleep(Math.min(remaining(TimeUnit.NANOSECONDS), unit.toNanos(sleep))); + TimeUnit.NANOSECONDS.sleep(Math.min(elapsed(TimeUnit.NANOSECONDS), unit.toNanos(sleep))); + } + + public long remaining(TimeUnit unit) { + return unit.convert(Long.max(spanNanos - elapsed(TimeUnit.NANOSECONDS), 0L), TimeUnit.NANOSECONDS); } public void loopUntilExhausted(ThrowingAction action) throws Exception { @@ -52,7 +56,7 @@ public void loopUntilExhausted(ThrowingAction action, long delay, TimeUnit delayUnit) throws Exception { while (!elapsed()) { action.run(); - if (remaining(delayUnit) < delay) { + if (elapsed(delayUnit) < delay) { break; } delayUnit.sleep(delay); -- To view, visit https://asterix-gerrit.ics.uci.edu/2507 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I0517e5a390d50e8703ffdbecbb84467c22edda85 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: release-0.9.4-pre-rc Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
