Repository: asterixdb
Updated Branches:
  refs/heads/master 877a36de4 -> 278df951a


[NO ISSUE][CLUS] Unify Code Path of Requesting Startup Tasks

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Remove special case of requesting startup tasks on
  NC startup complete.
- Handle case where CC might fail before sending startup
  tasks to NC.

Change-Id: Ieb89d6f293b0e958c3f141afc6f1db372cee7c91
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2401
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/278df951
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/278df951
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/278df951

Branch: refs/heads/master
Commit: 278df951aef99abbb07ab053f1a3cc78ca582aed
Parents: 877a36d
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Mon Feb 19 07:12:57 2018 +0300
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Wed Feb 21 02:25:20 2018 -0800

----------------------------------------------------------------------
 .../app/nc/task/UpdateNodeStatusTask.java       | 46 +++++++++++++++++++
 .../app/replication/NcLifecycleCoordinator.java |  2 +
 .../hyracks/bootstrap/NCApplication.java        | 48 ++++++++++----------
 .../control/nc/NodeControllerService.java       | 12 ++++-
 4 files changed, 82 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/278df951/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
new file mode 100644
index 0000000..a1e11c2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class UpdateNodeStatusTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final NodeStatus status;
+
+    public UpdateNodeStatusTask(NodeStatus status) {
+        this.status = status;
+    }
+
+    @Override
+    public void perform(CcId ccId, IControllerService cs) {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        ncs.setNodeStatus(status);
+    }
+
+    @Override
+    public String toString() {
+        return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/278df951/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 25e768d..844851a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -34,6 +34,7 @@ import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
 import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
 import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.nc.task.UpdateNodeStatusTask;
 import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
 import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
 import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
@@ -145,6 +146,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
             return buildActiveNCRegTasks(isMetadataNode);
         }
         final List<INCLifecycleTask> tasks = new ArrayList<>();
+        tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE));
         if (state == SystemState.CORRUPTED) {
             //need to perform local recovery for node partitions
             LocalRecoveryTask rt = new 
LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/278df951/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index b0382f7..3a8a92f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -73,7 +73,6 @@ public class NCApplication extends BaseNCApplication {
     private INcApplicationContext runtimeContext;
     private String nodeId;
     private boolean stopInitiated;
-    private boolean startupCompleted;
     protected WebManager webManager;
 
     @Override
@@ -190,33 +189,16 @@ public class NCApplication extends BaseNCApplication {
         // configure servlets after joining the cluster, so we can create 
HyracksClientConnection
         configureServers();
         webManager.start();
-
-        // Since we don't pass initial run flag in 
AsterixHyracksIntegrationUtil, we use the virtualNC flag
-        final NodeProperties nodeProperties = 
runtimeContext.getNodeProperties();
-        IRecoveryManager recoveryMgr = 
runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        SystemState state = recoveryMgr.getSystemState();
-        if (state == SystemState.PERMANENT_DATA_LOSS
-                && (nodeProperties.isInitialRun() || 
nodeProperties.isVirtualNc())) {
-            state = SystemState.BOOTSTRAPPING;
-        }
-        // Request registration tasks from CC (we only do this from our 
primary CC, in the case of multiple CCs)
-        final NodeControllerService ncControllerService = 
(NodeControllerService) ncServiceCtx.getControllerService();
-        
RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryCcId(), 
ncControllerService,
-                NodeStatus.BOOTING, state);
-        startupCompleted = true;
     }
 
     @Override
-    public void onRegisterNode(CcId ccId) throws Exception {
-        if (startupCompleted) {
-            /*
-             * If the node completed its startup before, then this is a 
re-registration with
-             * the primary (or supplemental) CC and therefore the system state 
should be HEALTHY and the node status
-             * is ACTIVE
-             */
-            RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) 
ncServiceCtx.getControllerService(),
-                    NodeStatus.ACTIVE, SystemState.HEALTHY);
-        }
+    public synchronized void onRegisterNode(CcId ccId) throws Exception {
+        final NodeControllerService ncs = (NodeControllerService) 
ncServiceCtx.getControllerService();
+        final NodeStatus currentStatus = ncs.getNodeStatus();
+        final SystemState systemState = isPendingStartupTasks(currentStatus, 
ncs.getPrimaryCcId(), ccId)
+                ? getCurrentSystemState() : SystemState.HEALTHY;
+        RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) 
ncServiceCtx.getControllerService(),
+                currentStatus, systemState);
     }
 
     @Override
@@ -259,4 +241,20 @@ public class NCApplication extends BaseNCApplication {
             return devices.get(ioDeviceIndex);
         };
     }
+
+    private boolean isPendingStartupTasks(NodeStatus nodeStatus, CcId 
primaryCc, CcId registeredCc) {
+        return nodeStatus == NodeStatus.BOOTING && (primaryCc == null || 
primaryCc.equals(registeredCc));
+    }
+
+    private SystemState getCurrentSystemState() {
+        final NodeProperties nodeProperties = 
runtimeContext.getNodeProperties();
+        IRecoveryManager recoveryMgr = 
runtimeContext.getTransactionSubsystem().getRecoveryManager();
+        SystemState state = recoveryMgr.getSystemState();
+        // Since we don't pass initial run flag in 
AsterixHyracksIntegrationUtil, we use the virtualNC flag
+        if (state == SystemState.PERMANENT_DATA_LOSS
+                && (nodeProperties.isInitialRun() || 
nodeProperties.isVirtualNc())) {
+            state = SystemState.BOOTSTRAPPING;
+        }
+        return state;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/278df951/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0f40b60..0ccef1d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -48,6 +48,7 @@ import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
@@ -62,6 +63,7 @@ import org.apache.hyracks.api.job.JobParameterByteStore;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
@@ -95,7 +97,6 @@ import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 import org.apache.hyracks.util.ExitUtil;
-import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.util.PidHelper;
 import org.apache.hyracks.util.trace.ITracer;
 import org.apache.hyracks.util.trace.Tracer;
@@ -183,6 +184,7 @@ public class NodeControllerService implements 
IControllerService {
     private final ConfigManager configManager;
 
     private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>();
+    private NodeStatus status = NodeStatus.BOOTING;
 
     static {
         ExitUtil.init();
@@ -627,6 +629,14 @@ public class NodeControllerService implements 
IControllerService {
         return workQueue;
     }
 
+    public synchronized NodeStatus getNodeStatus() {
+        return status;
+    }
+
+    public synchronized void setNodeStatus(NodeStatus status) {
+        this.status = status;
+    }
+
     private class HeartbeatTask implements Runnable {
         private final Semaphore delayBlock = new Semaphore(0);
         private final IClusterController cc;

Reply via email to