Repository: asterixdb Updated Branches: refs/heads/master 877a36de4 -> 278df951a
[NO ISSUE][CLUS] Unify Code Path of Requesting Startup Tasks - user model changes: no - storage format changes: no - interface changes: no Details: - Remove special case of requesting startup tasks on NC startup complete. - Handle case where CC might fail before sending startup tasks to NC. Change-Id: Ieb89d6f293b0e958c3f141afc6f1db372cee7c91 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2401 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/278df951 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/278df951 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/278df951 Branch: refs/heads/master Commit: 278df951aef99abbb07ab053f1a3cc78ca582aed Parents: 877a36d Author: Murtadha Hubail <mhub...@apache.org> Authored: Mon Feb 19 07:12:57 2018 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Wed Feb 21 02:25:20 2018 -0800 ---------------------------------------------------------------------- .../app/nc/task/UpdateNodeStatusTask.java | 46 +++++++++++++++++++ .../app/replication/NcLifecycleCoordinator.java | 2 + .../hyracks/bootstrap/NCApplication.java | 48 ++++++++++---------- .../control/nc/NodeControllerService.java | 12 ++++- 4 files changed, 82 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/278df951/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java new file mode 100644 index 0000000..a1e11c2 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.hyracks.api.client.NodeStatus; +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class UpdateNodeStatusTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + private final NodeStatus status; + + public UpdateNodeStatusTask(NodeStatus status) { + this.status = status; + } + + @Override + public void perform(CcId ccId, IControllerService cs) { + NodeControllerService ncs = (NodeControllerService) cs; + ncs.setNodeStatus(status); + } + + @Override + public String toString() { + return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }"; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/278df951/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index 25e768d..844851a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -34,6 +34,7 @@ import org.apache.asterix.app.nc.task.MetadataBootstrapTask; import org.apache.asterix.app.nc.task.ReportLocalCountersTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.nc.task.StartReplicationServiceTask; +import org.apache.asterix.app.nc.task.UpdateNodeStatusTask; import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage; import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage; import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; @@ -145,6 +146,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { return buildActiveNCRegTasks(isMetadataNode); } final List<INCLifecycleTask> tasks = new ArrayList<>(); + tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE)); if (state == SystemState.CORRUPTED) { //need to perform local recovery for node partitions LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId)) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/278df951/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index b0382f7..3a8a92f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -73,7 +73,6 @@ public class NCApplication extends BaseNCApplication { private INcApplicationContext runtimeContext; private String nodeId; private boolean stopInitiated; - private boolean startupCompleted; protected WebManager webManager; @Override @@ -190,33 +189,16 @@ public class NCApplication extends BaseNCApplication { // configure servlets after joining the cluster, so we can create HyracksClientConnection configureServers(); webManager.start(); - - // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag - final NodeProperties nodeProperties = runtimeContext.getNodeProperties(); - IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); - SystemState state = recoveryMgr.getSystemState(); - if (state == SystemState.PERMANENT_DATA_LOSS - && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) { - state = SystemState.BOOTSTRAPPING; - } - // Request registration tasks from CC (we only do this from our primary CC, in the case of multiple CCs) - final NodeControllerService ncControllerService = (NodeControllerService) ncServiceCtx.getControllerService(); - RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryCcId(), ncControllerService, - NodeStatus.BOOTING, state); - startupCompleted = true; } @Override - public void onRegisterNode(CcId ccId) throws Exception { - if (startupCompleted) { - /* - * If the node completed its startup before, then this is a re-registration with - * the primary (or supplemental) CC and therefore the system state should be HEALTHY and the node status - * is ACTIVE - */ - RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(), - NodeStatus.ACTIVE, SystemState.HEALTHY); - } + public synchronized void onRegisterNode(CcId ccId) throws Exception { + final NodeControllerService ncs = (NodeControllerService) ncServiceCtx.getControllerService(); + final NodeStatus currentStatus = ncs.getNodeStatus(); + final SystemState systemState = isPendingStartupTasks(currentStatus, ncs.getPrimaryCcId(), ccId) + ? getCurrentSystemState() : SystemState.HEALTHY; + RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(), + currentStatus, systemState); } @Override @@ -259,4 +241,20 @@ public class NCApplication extends BaseNCApplication { return devices.get(ioDeviceIndex); }; } + + private boolean isPendingStartupTasks(NodeStatus nodeStatus, CcId primaryCc, CcId registeredCc) { + return nodeStatus == NodeStatus.BOOTING && (primaryCc == null || primaryCc.equals(registeredCc)); + } + + private SystemState getCurrentSystemState() { + final NodeProperties nodeProperties = runtimeContext.getNodeProperties(); + IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); + SystemState state = recoveryMgr.getSystemState(); + // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag + if (state == SystemState.PERMANENT_DATA_LOSS + && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) { + state = SystemState.BOOTSTRAPPING; + } + return state; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/278df951/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 0f40b60..0ccef1d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -48,6 +48,7 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.api.application.INCApplication; import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; @@ -62,6 +63,7 @@ import org.apache.hyracks.api.job.JobParameterByteStore; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.control.common.base.IClusterController; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.context.ServerContext; @@ -95,7 +97,6 @@ import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory; import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters; import org.apache.hyracks.util.ExitUtil; -import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.util.PidHelper; import org.apache.hyracks.util.trace.ITracer; import org.apache.hyracks.util.trace.Tracer; @@ -183,6 +184,7 @@ public class NodeControllerService implements IControllerService { private final ConfigManager configManager; private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>(); + private NodeStatus status = NodeStatus.BOOTING; static { ExitUtil.init(); @@ -627,6 +629,14 @@ public class NodeControllerService implements IControllerService { return workQueue; } + public synchronized NodeStatus getNodeStatus() { + return status; + } + + public synchronized void setNodeStatus(NodeStatus status) { + this.status = status; + } + private class HeartbeatTask implements Runnable { private final Semaphore delayBlock = new Semaphore(0); private final IClusterController cc;