abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2507

Change subject: [NO ISSUE][RT] Ensure all NC tasks are aborted before joining
......................................................................

[NO ISSUE][RT] Ensure all NC tasks are aborted before joining

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Add getApplication() to NodeControllerService

details:
- This change ensures that all previous tasks
  of a CC on an NC are completed before completion of
  registration.

Change-Id: I0517e5a390d50e8703ffdbecbb84467c22edda85
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
M 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
5 files changed, 109 insertions(+), 11 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/07/2507/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0756210..1c74259 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -362,8 +362,7 @@
             };
             ClusterControllerRemoteProxy ccProxy = new 
ClusterControllerRemoteProxy(
                     ipc.getHandle(ccAddress, 
ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            CcConnection ccc = new CcConnection(ccProxy);
-            return registerNode(ccc, ccAddress);
+            return registerNode(new CcConnection(ccProxy), ccAddress);
         }
     }
 
@@ -414,7 +413,6 @@
 
     private CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) 
throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
-
         int registrationId = nextRegistrationId.incrementAndGet();
         pendingRegistrations.put(registrationId, ccc);
         CcId ccId = ccc.registerNode(nodeRegistration, registrationId);
@@ -424,10 +422,8 @@
         if (distributedState != null) {
             getDistributedState().put(ccId, distributedState);
         }
-        application.onRegisterNode(ccId);
         IClusterController ccs = ccc.getClusterControllerService();
         NodeParameters nodeParameters = ccc.getNodeParameters();
-
         // Start heartbeat generator.
         if (!heartbeatThreads.containsKey(ccId)) {
             Thread heartbeatThread = new Thread(
@@ -444,7 +440,6 @@
             ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, 
nodeParameters.getProfileDumpPeriod());
             ccTimers.put(ccId, ccTimer);
         }
-
         LOGGER.info("Registering with Cluster Controller {} complete", ccc);
         return ccId;
     }
@@ -725,4 +720,8 @@
             }
         }
     }
+
+    public INCApplication getApplication() {
+        return application;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 9d99968..fc9d2d6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -112,6 +112,8 @@
 
     private final IStatsCollector statsCollector;
 
+    private volatile boolean completed = false;
+
     public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, 
String displayName,
             ExecutorService executor, NodeControllerService ncs,
             List<List<PartitionChannel>> inputChannelsFromConnectors) {
@@ -255,8 +257,7 @@
         if (aborted) {
             return false;
         }
-        pendingThreads.add(t);
-        return true;
+        return pendingThreads.add(t);
     }
 
     private synchronized void removePendingThread(Thread t) {
@@ -345,6 +346,7 @@
         } finally {
             close();
             removePendingThread(ct);
+            completed = true;
         }
         if (!exceptions.isEmpty()) {
             if (LOGGER.isWarnEnabled()) {
@@ -460,4 +462,8 @@
     public IStatsCollector getStatsCollector() {
         return statsCollector;
     }
+
+    public boolean isCompleted() {
+        return completed;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 2bcf414..c6696fd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -18,7 +18,9 @@
  */
 package org.apache.hyracks.control.nc.work;
 
+import java.util.ArrayDeque;
 import java.util.Collection;
+import java.util.Deque;
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
@@ -50,13 +52,14 @@
         if (dpm == null) {
             LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + 
ncs.getId());
         }
+        Deque<Task> abortedTasks = new ArrayDeque<>();
         Collection<Joblet> joblets = ncs.getJobletMap().values();
         // TODO(mblow): should we have one jobletmap per cc?
         joblets.stream().filter(joblet -> 
joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> {
-            Collection<Task> tasks = joblet.getTaskMap().values();
-            for (Task task : tasks) {
+            joblet.getTaskMap().values().forEach(task -> {
                 task.abort();
-            }
+                abortedTasks.add(task);
+            });
             final JobId jobId = joblet.getJobId();
             if (dpm != null) {
                 dpm.abortReader(jobId);
@@ -64,5 +67,6 @@
             }
             ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, 
JobStatus.FAILURE));
         });
+        ncs.getExecutor().submit(new EnsureAllCcTasksCompleted(ncs, ccId, 
abortedTasks));
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
new file mode 100644
index 0000000..bbf4ee5
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Deque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.Task;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.Span;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@SuppressWarnings("squid:S1181")
+public class EnsureAllCcTasksCompleted implements Runnable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2);
+    private final NodeControllerService ncs;
+    private final CcId ccId;
+    private final Deque<Task> abortedTasks;
+    private final Span span;
+
+    public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, 
Deque<Task> abortedTasks) {
+        this.ncs = ncs;
+        this.ccId = ccId;
+        this.abortedTasks = abortedTasks;
+        span = Span.start(2, TimeUnit.MINUTES);
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (!span.elapsed()) {
+                removeAborted();
+                if (abortedTasks.isEmpty()) {
+                    break;
+                }
+                Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // Check once a 
second
+            }
+            if (abortedTasks.isEmpty()) {
+                // all tasks has completed
+                ncs.getApplication().onRegisterNode(ccId);
+            } else {
+                LOGGER.log(Level.ERROR,
+                        "Failed to abort all previous tasks associated with CC 
{} after {}ms. Giving up", ccId,
+                        TIMEOUT);
+                ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+            }
+        } catch (Throwable th) {
+            try {
+                LOGGER.log(Level.ERROR, "Failed to abort all previous tasks 
associated with CC {}", ccId, th);
+            } catch (Throwable ignore) {
+                // Ignore logging errors
+            }
+            ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+        }
+    }
+
+    private void removeAborted() {
+        int numTasks = abortedTasks.size();
+        for (int i = 0; i < numTasks; i++) {
+            Task task = abortedTasks.poll();
+            if (!task.isCompleted()) {
+                abortedTasks.add(task);
+            }
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index db5cd13..1a17012 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -33,6 +33,7 @@
     public static final int EC_ABNORMAL_TERMINATION = 1;
     public static final int EC_FAILED_TO_STARTUP = 2;
     public static final int EC_FAILED_TO_RECOVER = 3;
+    public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
     public static final int EC_UNHANDLED_EXCEPTION = 11;
     public static final int EC_IMMEDIATE_HALT = 33;
     public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2507
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0517e5a390d50e8703ffdbecbb84467c22edda85
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: release-0.9.4-pre-rc
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to