This is an automated email from the ASF dual-hosted git repository.

rohit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cloudstack.git

commit 8ef131745a5ef0e5e6ddc7e498f3a0208f1bfb71
Merge: 1708838 30175d6
Author: Rohit Yadav <rohit.ya...@shapeblue.com>
AuthorDate: Thu Mar 15 16:46:50 2018 +0530

    Merge branch '4.11'

 agent/conf/agent.properties                        |   11 +
 agent/src/main/java/com/cloud/agent/Agent.java     |  119 +-
 .../src/main/java/com/cloud/agent/AgentShell.java  |   58 +-
 .../src/main/java/com/cloud/agent/IAgentShell.java |   45 +-
 .../test/java/com/cloud/agent/AgentShellTest.java  |    8 +-
 .../com/cloud/agent/api/to/VirtualMachineTO.java   |   10 +
 .../cloudstack/config/ApiServiceConfiguration.java |    4 +-
 client/pom.xml                                     |    5 +
 .../java/com/cloud/agent/api/ReadyCommand.java     |   31 +-
 .../java/com/cloud/agent/api/StartupCommand.java   |    9 +
 .../cloudstack/agent/lb/SetupMSListAnswer.java}    |   46 +-
 .../cloudstack/agent/lb/SetupMSListCommand.java}   |   46 +-
 engine/orchestration/pom.xml                       |    5 +
 .../com/cloud/agent/manager/AgentManagerImpl.java  |   25 +-
 .../storage/endpoint/DefaultEndPointSelector.java  |    4 +-
 framework/agent-lb/pom.xml                         |   32 +
 .../cloudstack/agent/lb/IndirectAgentLB.java       |   53 +
 .../agent/lb/IndirectAgentLBAlgorithm.java         |   45 +
 framework/pom.xml                                  |    3 +-
 .../kvm/resource/LibvirtComputingResource.java     |   28 +
 .../hypervisor/kvm/resource/LibvirtVMDef.java      |   27 +
 .../kvm/resource/LibvirtComputingResourceTest.java |   39 +
 .../cloud/agent/manager/MockStorageManager.java    |    6 +
 .../agent/manager/MockStorageManagerImpl.java      |   91 +-
 .../cloud/agent/manager/SimulatorManagerImpl.java  |   17 +-
 .../cloud/resource/SimulatorStorageProcessor.java  |    2 +-
 .../network/lb/ElasticLoadBalancerManagerImpl.java |    2 +-
 server/pom.xml                                     |    5 +
 .../configuration/ConfigurationManagerImpl.java    |    5 +
 .../consoleproxy/ConsoleProxyManagerImpl.java      |    6 +-
 .../main/java/com/cloud/hypervisor/KVMGuru.java    |   50 +
 .../kvm/discoverer/LibvirtServerDiscoverer.java    |    6 +-
 .../network/element/ConfigDriveNetworkElement.java |   13 +-
 .../router/VirtualNetworkApplianceManagerImpl.java |    4 +-
 .../com/cloud/server/ConfigurationServerImpl.java  |    4 +-
 .../core/spring-server-core-managers-context.xml   |    2 +
 .../agent/lb/IndirectAgentLBServiceImpl.java       |  231 +++
 .../IndirectAgentLBRoundRobinAlgorithm.java        |   59 +
 .../algorithm/IndirectAgentLBShuffleAlgorithm.java |   44 +
 .../algorithm/IndirectAgentLBStaticAlgorithm.java  |   50 +-
 .../src/test/resources/createNetworkOffering.xml   |    1 +
 server/src/test/resources/testContext.xml          |    7 +-
 server/test/com/cloud/hypervisor/KVMGuruTest.java  |   99 +
 .../agent/lb/IndirectAgentLBServiceImplTest.java   |  208 ++
 .../IndirectAgentLBRoundRobinAlgorithmTest.java    |   76 +
 .../IndirectAgentLBShuffleAlgorithmTest.java       |   60 +
 .../IndirectAgentLBStaticAlgorithmTest.java        |   49 +
 .../SecondaryStorageManagerImpl.java               |    9 +-
 systemvm/debian/opt/cloud/bin/checkrouter.sh       |    7 +
 .../opt/cloud/templates/check_heartbeat.sh.templ   |   78 +-
 .../plugins/nuagevsp/libVSD/__init__.py            |   22 +-
 test/integration/plugins/nuagevsp/libVSD/client.py |  135 ++
 .../integration/plugins/nuagevsp/libVSD/helpers.py |  602 ++++++
 test/integration/plugins/nuagevsp/nuageTestCase.py |  319 ++-
 .../plugins/nuagevsp/nuage_test_data.py            | 2177 ++++----------------
 .../plugins/nuagevsp/test_nuage_configdrive.py     |  173 +-
 .../plugins/nuagevsp/test_nuage_extra_dhcp.py      |   26 +-
 .../plugins/nuagevsp/test_nuage_internal_dns.py    |  400 +---
 .../nuagevsp/test_nuage_network_migration.py       |   24 -
 .../plugins/nuagevsp/test_nuage_password_reset.py  |   11 +-
 .../test_nuage_public_sharednetwork_userdata.py    |   13 +-
 .../plugins/nuagevsp/test_nuage_source_nat.py      |   19 +-
 .../plugins/nuagevsp/test_nuage_static_nat.py      |   19 +-
 .../plugins/nuagevsp/test_nuage_vpc_internal_lb.py |  250 +--
 .../nuagevsp/test_nuage_vsp_domain_template.py     |    4 +
 .../nuagevsp/test_nuage_vsp_mngd_subnets.py        |  157 +-
 test/integration/smoke/test_service_offerings.py   |  169 +-
 tools/marvin/marvin/config/test_data.py            |  876 ++------
 tools/marvin/setup.py                              |    2 +-
 .../src/main/java/com/cloud/utils/StringUtils.java |    8 +-
 .../test/java/com/cloud/utils/StringUtilsTest.java |   11 +-
 71 files changed, 3624 insertions(+), 3637 deletions(-)

diff --cc 
engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
index 4b80fb1,0000000..6091131
mode 100644,000000..100644
--- 
a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
+++ 
b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
@@@ -1,1768 -1,0 +1,1791 @@@
 +// Licensed to the Apache Software Foundation (ASF) under one
 +// or more contributor license agreements.  See the NOTICE file
 +// distributed with this work for additional information
 +// regarding copyright ownership.  The ASF licenses this file
 +// to you under the Apache License, Version 2.0 (the
 +// "License"); you may not use this file except in compliance
 +// with the License.  You may obtain a copy of the License at
 +//
 +//   http://www.apache.org/licenses/LICENSE-2.0
 +//
 +// Unless required by applicable law or agreed to in writing,
 +// software distributed under the License is distributed on an
 +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +// KIND, either express or implied.  See the License for the
 +// specific language governing permissions and limitations
 +// under the License.
 +package com.cloud.agent.manager;
 +
 +import java.lang.reflect.Constructor;
 +import java.lang.reflect.InvocationTargetException;
 +import java.nio.channels.ClosedChannelException;
 +import java.util.ArrayList;
++import java.util.Arrays;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.locks.Lock;
 +import java.util.concurrent.locks.ReentrantLock;
 +
 +import javax.inject.Inject;
 +import javax.naming.ConfigurationException;
 +
++import org.apache.cloudstack.agent.lb.IndirectAgentLB;
 +import org.apache.cloudstack.ca.CAManager;
 +import org.apache.cloudstack.framework.config.ConfigKey;
 +import org.apache.cloudstack.framework.config.Configurable;
 +import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 +import org.apache.cloudstack.framework.jobs.AsyncJob;
 +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 +import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 +import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao;
 +import org.apache.cloudstack.utils.identity.ManagementServerNode;
 +import org.apache.log4j.Logger;
 +import org.slf4j.MDC;
 +
 +import com.cloud.agent.AgentManager;
 +import com.cloud.agent.Listener;
 +import com.cloud.agent.StartupCommandProcessor;
 +import com.cloud.agent.api.AgentControlAnswer;
 +import com.cloud.agent.api.AgentControlCommand;
 +import com.cloud.agent.api.Answer;
 +import com.cloud.agent.api.CheckHealthCommand;
 +import com.cloud.agent.api.Command;
 +import com.cloud.agent.api.PingAnswer;
 +import com.cloud.agent.api.PingCommand;
 +import com.cloud.agent.api.PingRoutingCommand;
 +import com.cloud.agent.api.ReadyAnswer;
 +import com.cloud.agent.api.ReadyCommand;
 +import com.cloud.agent.api.SetHostParamsCommand;
 +import com.cloud.agent.api.ShutdownCommand;
 +import com.cloud.agent.api.StartupAnswer;
 +import com.cloud.agent.api.StartupCommand;
 +import com.cloud.agent.api.StartupProxyCommand;
 +import com.cloud.agent.api.StartupRoutingCommand;
 +import com.cloud.agent.api.StartupSecondaryStorageCommand;
 +import com.cloud.agent.api.StartupStorageCommand;
 +import com.cloud.agent.api.UnsupportedAnswer;
 +import com.cloud.agent.transport.Request;
 +import com.cloud.agent.transport.Response;
 +import com.cloud.alert.AlertManager;
 +import com.cloud.configuration.ManagementServiceConfiguration;
 +import com.cloud.dc.ClusterVO;
 +import com.cloud.dc.DataCenterVO;
 +import com.cloud.dc.HostPodVO;
 +import com.cloud.dc.dao.ClusterDao;
 +import com.cloud.dc.dao.DataCenterDao;
 +import com.cloud.dc.dao.HostPodDao;
 +import com.cloud.exception.AgentUnavailableException;
 +import com.cloud.exception.ConnectionException;
 +import com.cloud.exception.OperationTimedoutException;
 +import com.cloud.exception.UnsupportedVersionException;
 +import com.cloud.ha.HighAvailabilityManager;
 +import com.cloud.host.Host;
 +import com.cloud.host.HostVO;
 +import com.cloud.host.Status;
 +import com.cloud.host.Status.Event;
 +import com.cloud.host.dao.HostDao;
 +import com.cloud.hypervisor.Hypervisor.HypervisorType;
 +import com.cloud.hypervisor.HypervisorGuruManager;
 +import com.cloud.resource.Discoverer;
 +import com.cloud.resource.ResourceManager;
 +import com.cloud.resource.ResourceState;
 +import com.cloud.resource.ServerResource;
 +import com.cloud.utils.Pair;
 +import com.cloud.utils.component.ManagerBase;
 +import com.cloud.utils.concurrency.NamedThreadFactory;
 +import com.cloud.utils.db.DB;
 +import com.cloud.utils.db.EntityManager;
 +import com.cloud.utils.db.QueryBuilder;
 +import com.cloud.utils.db.SearchCriteria.Op;
 +import com.cloud.utils.db.TransactionLegacy;
 +import com.cloud.utils.exception.CloudRuntimeException;
 +import com.cloud.utils.exception.HypervisorVersionChangedException;
 +import com.cloud.utils.exception.NioConnectionException;
 +import com.cloud.utils.exception.TaskExecutionException;
 +import com.cloud.utils.fsm.NoTransitionException;
 +import com.cloud.utils.fsm.StateMachine2;
 +import com.cloud.utils.nio.HandlerFactory;
 +import com.cloud.utils.nio.Link;
 +import com.cloud.utils.nio.NioServer;
 +import com.cloud.utils.nio.Task;
 +import com.cloud.utils.time.InaccurateClock;
++import com.google.common.base.Strings;
 +
 +/**
 + * Implementation of the Agent Manager. This class controls the connection to 
the agents.
 + **/
 +public class AgentManagerImpl extends ManagerBase implements AgentManager, 
HandlerFactory, Configurable {
 +    protected static final Logger s_logger = 
Logger.getLogger(AgentManagerImpl.class);
 +
 +    /**
 +     * _agents is a ConcurrentHashMap, but it is used from within a 
synchronized block. This will be reported by findbugs as 
JLM_JSR166_UTILCONCURRENT_MONITORENTER. Maybe a
 +     * ConcurrentHashMap is not the right thing to use here, but i'm not sure 
so i leave it alone.
 +     */
 +    protected ConcurrentHashMap<Long, AgentAttache> _agents = new 
ConcurrentHashMap<Long, AgentAttache>(10007);
 +    protected List<Pair<Integer, Listener>> _hostMonitors = new 
ArrayList<Pair<Integer, Listener>>(17);
 +    protected List<Pair<Integer, Listener>> _cmdMonitors = new 
ArrayList<Pair<Integer, Listener>>(17);
 +    protected List<Pair<Integer, StartupCommandProcessor>> _creationMonitors 
= new ArrayList<Pair<Integer, StartupCommandProcessor>>(17);
 +    protected List<Long> _loadingAgents = new ArrayList<Long>();
 +    private int _monitorId = 0;
 +    private final Lock _agentStatusLock = new ReentrantLock();
 +
 +    @Inject
 +    protected CAManager caService;
 +    @Inject
 +    protected EntityManager _entityMgr;
 +
 +    protected NioServer _connection;
 +    @Inject
 +    protected HostDao _hostDao = null;
 +    @Inject
 +    protected OutOfBandManagementDao outOfBandManagementDao;
 +    @Inject
 +    protected DataCenterDao _dcDao = null;
 +    @Inject
 +    protected HostPodDao _podDao = null;
 +    @Inject
 +    protected ConfigurationDao _configDao = null;
 +    @Inject
 +    protected ClusterDao _clusterDao = null;
 +
 +    @Inject
 +    protected HighAvailabilityManager _haMgr = null;
 +    @Inject
 +    protected AlertManager _alertMgr = null;
 +
 +    @Inject
 +    protected HypervisorGuruManager _hvGuruMgr;
 +
++    @Inject
++    protected IndirectAgentLB indirectAgentLB;
++
 +    protected int _retry = 2;
 +
 +    protected long _nodeId = -1;
 +
 +    protected ExecutorService _executor;
 +    protected ThreadPoolExecutor _connectExecutor;
 +    protected ScheduledExecutorService _directAgentExecutor;
 +    protected ScheduledExecutorService _cronJobExecutor;
 +    protected ScheduledExecutorService _monitorExecutor;
 +
 +    private int _directAgentThreadCap;
 +
 +    protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = 
Status.getStateMachine();
 +    private final ConcurrentHashMap<Long, Long> _pingMap = new 
ConcurrentHashMap<Long, Long>(10007);
 +
 +    @Inject
 +    ResourceManager _resourceMgr;
 +    @Inject
 +    ManagementServiceConfiguration mgmtServiceConf;
 +
 +    protected final ConfigKey<Integer> Workers = new 
ConfigKey<Integer>("Advanced", Integer.class, "workers", "5",
 +            "Number of worker threads handling remote agent connections.", 
false);
 +    protected final ConfigKey<Integer> Port = new 
ConfigKey<Integer>("Advanced", Integer.class, "port", "8250", "Port to listen 
on for remote agent connections.", false);
 +    protected final ConfigKey<Integer> AlertWait = new 
ConfigKey<Integer>("Advanced", Integer.class, "alert.wait", "1800",
 +            "Seconds to wait before alerting on a disconnected agent", true);
 +    protected final ConfigKey<Integer> DirectAgentLoadSize = new 
ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.load.size", "16",
 +            "The number of direct agents to load each time", false);
 +    protected final ConfigKey<Integer> DirectAgentPoolSize = new 
ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.pool.size", "500",
 +            "Default size for DirectAgentPool", false);
 +    protected final ConfigKey<Float> DirectAgentThreadCap = new 
ConfigKey<Float>("Advanced", Float.class, "direct.agent.thread.cap", "1",
 +            "Percentage (as a value between 0 and 1) of 
direct.agent.pool.size to be used as upper thread cap for a single direct agent 
to process requests", false);
 +    protected final ConfigKey<Boolean> CheckTxnBeforeSending = new 
ConfigKey<Boolean>("Developer", Boolean.class, 
"check.txn.before.sending.agent.commands", "false",
 +            "This parameter allows developers to enable a check to see if a 
transaction wraps commands that are sent to the resource.  This is not to be 
enabled on production systems.", true);
 +
 +    @Override
 +    public boolean configure(final String name, final Map<String, Object> 
params) throws ConfigurationException {
 +
 +        s_logger.info("Ping Timeout is " + mgmtServiceConf.getPingTimeout());
 +
 +        final int threads = DirectAgentLoadSize.value();
 +
 +        _nodeId = ManagementServerNode.getManagementServerId();
 +        s_logger.info("Configuring AgentManagerImpl. management server node 
id(msid): " + _nodeId);
 +
 +        final long lastPing = (System.currentTimeMillis() >> 10) - 
mgmtServiceConf.getTimeout();
 +        _hostDao.markHostsAsDisconnected(_nodeId, lastPing);
 +
 +        registerForHostEvents(new BehindOnPingListener(), true, true, false);
 +
 +        registerForHostEvents(new SetHostParamsListener(), true, true, false);
 +
 +        _executor = new ThreadPoolExecutor(threads, threads, 60l, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("AgentTaskPool"));
 +
 +        _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, 
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("AgentConnectTaskPool"));
 +        // allow core threads to time out even when there are no items in the 
queue
 +        _connectExecutor.allowCoreThreadTimeOut(true);
 +
 +        _connection = new NioServer("AgentManager", Port.value(), 
Workers.value() + 10, this, caService);
 +        s_logger.info("Listening on " + Port.value() + " with " + 
Workers.value() + " workers");
 +
 +        // executes all agent commands other than cron and ping
 +        _directAgentExecutor = new 
ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new 
NamedThreadFactory("DirectAgent"));
 +        // executes cron and ping agent commands
 +        _cronJobExecutor = new 
ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new 
NamedThreadFactory("DirectAgentCronJob"));
 +        s_logger.debug("Created DirectAgentAttache pool with size: " + 
DirectAgentPoolSize.value());
 +        _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * 
DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0
 +
 +        _monitorExecutor = new ScheduledThreadPoolExecutor(1, new 
NamedThreadFactory("AgentMonitor"));
 +
 +        return true;
 +    }
 +
 +    @Override
 +    public Task create(final Task.Type type, final Link link, final byte[] 
data) {
 +        return new AgentHandler(type, link, data);
 +    }
 +
 +    @Override
 +    public int registerForHostEvents(final Listener listener, final boolean 
connections, final boolean commands, final boolean priority) {
 +        synchronized (_hostMonitors) {
 +            _monitorId++;
 +            if (connections) {
 +                if (priority) {
 +                    _hostMonitors.add(0, new Pair<Integer, 
Listener>(_monitorId, listener));
 +                } else {
 +                    _hostMonitors.add(new Pair<Integer, Listener>(_monitorId, 
listener));
 +                }
 +            }
 +            if (commands) {
 +                if (priority) {
 +                    _cmdMonitors.add(0, new Pair<Integer, 
Listener>(_monitorId, listener));
 +                } else {
 +                    _cmdMonitors.add(new Pair<Integer, Listener>(_monitorId, 
listener));
 +                }
 +            }
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Registering listener " + 
listener.getClass().getSimpleName() + " with id " + _monitorId);
 +            }
 +            return _monitorId;
 +        }
 +    }
 +
 +    @Override
 +    public int registerForInitialConnects(final StartupCommandProcessor 
creator, final boolean priority) {
 +        synchronized (_hostMonitors) {
 +            _monitorId++;
 +            if (priority) {
 +                _creationMonitors.add(0, new Pair<Integer, 
StartupCommandProcessor>(_monitorId, creator));
 +            } else {
 +                _creationMonitors.add(new Pair<Integer, 
StartupCommandProcessor>(_monitorId, creator));
 +            }
 +            return _monitorId;
 +        }
 +    }
 +
 +    @Override
 +    public void unregisterForHostEvents(final int id) {
 +        s_logger.debug("Deregistering " + id);
 +        _hostMonitors.remove(id);
 +    }
 +
 +    private AgentControlAnswer handleControlCommand(final AgentAttache 
attache, final AgentControlCommand cmd) {
 +        AgentControlAnswer answer = null;
 +
 +        for (final Pair<Integer, Listener> listener : _cmdMonitors) {
 +            answer = listener.second().processControlCommand(attache.getId(), 
cmd);
 +
 +            if (answer != null) {
 +                return answer;
 +            }
 +        }
 +
 +        s_logger.warn("No handling of agent control command: " + cmd + " sent 
from " + attache.getId());
 +        return new AgentControlAnswer(cmd);
 +    }
 +
 +    public void handleCommands(final AgentAttache attache, final long 
sequence, final Command[] cmds) {
 +        for (final Pair<Integer, Listener> listener : _cmdMonitors) {
 +            final boolean processed = 
listener.second().processCommands(attache.getId(), sequence, cmds);
 +            if (s_logger.isTraceEnabled()) {
 +                s_logger.trace("SeqA " + attache.getId() + "-" + sequence + 
": " + (processed ? "processed" : "not processed") + " by " + 
listener.getClass());
 +            }
 +        }
 +    }
 +
 +    public void notifyAnswersToMonitors(final long agentId, final long seq, 
final Answer[] answers) {
 +        for (final Pair<Integer, Listener> listener : _cmdMonitors) {
 +            listener.second().processAnswers(agentId, seq, answers);
 +        }
 +    }
 +
 +    public AgentAttache findAttache(final long hostId) {
 +        AgentAttache attache = null;
 +        synchronized (_agents) {
 +            attache = _agents.get(hostId);
 +        }
 +        return attache;
 +    }
 +
 +    @Override
 +    public Answer sendTo(final Long dcId, final HypervisorType type, final 
Command cmd) {
 +        final List<ClusterVO> clusters = _clusterDao.listByDcHyType(dcId, 
type.toString());
 +        int retry = 0;
 +        for (final ClusterVO cluster : clusters) {
 +            final List<HostVO> hosts = 
_resourceMgr.listAllUpAndEnabledHosts(Host.Type.Routing, cluster.getId(), null, 
dcId);
 +            for (final HostVO host : hosts) {
 +                retry++;
 +                if (retry > _retry) {
 +                    return null;
 +                }
 +                Answer answer = null;
 +                try {
 +
 +                    final long targetHostId = 
_hvGuruMgr.getGuruProcessedCommandTargetHost(host.getId(), cmd);
 +                    answer = easySend(targetHostId, cmd);
 +                } catch (final Exception e) {
 +                }
 +                if (answer != null) {
 +                    return answer;
 +                }
 +            }
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public Answer send(final Long hostId, final Command cmd) throws 
AgentUnavailableException, OperationTimedoutException {
 +        final Commands cmds = new Commands(Command.OnError.Stop);
 +        cmds.addCommand(cmd);
 +        send(hostId, cmds, cmd.getWait());
 +        final Answer[] answers = cmds.getAnswers();
 +        if (answers != null && !(answers[0] instanceof UnsupportedAnswer)) {
 +            return answers[0];
 +        }
 +
 +        if (answers != null && answers[0] instanceof UnsupportedAnswer) {
 +            s_logger.warn("Unsupported Command: " + answers[0].getDetails());
 +            return answers[0];
 +        }
 +
 +        return null;
 +    }
 +
 +    @DB
 +    protected boolean noDbTxn() {
 +        final TransactionLegacy txn = TransactionLegacy.currentTxn();
 +        return !txn.dbTxnStarted();
 +    }
 +
 +    private static void tagCommand(final Command cmd) {
 +        final AsyncJobExecutionContext context = 
AsyncJobExecutionContext.getCurrent();
 +        if (context != null && context.getJob() != null) {
 +            final AsyncJob job = context.getJob();
 +
 +            if (job.getRelated() != null && !job.getRelated().isEmpty()) {
 +                cmd.setContextParam("job", "job-" + job.getRelated() + "/" + 
"job-" + job.getId());
 +            } else {
 +                cmd.setContextParam("job", "job-" + job.getId());
 +            }
 +        }
 +        if (MDC.get("logcontextid") != null && 
!MDC.get("logcontextid").isEmpty()) {
 +            cmd.setContextParam("logid", MDC.get("logcontextid"));
 +        }
 +    }
 +
 +    /**
 +     * @param commands
 +     * @return
 +     */
 +    private Command[] checkForCommandsAndTag(final Commands commands) {
 +        final Command[] cmds = commands.toCommands();
 +
 +        assert cmds.length > 0 : "Ask yourself this about a hundred times.  
Why am I  sending zero length commands?";
 +
 +        setEmptyAnswers(commands, cmds);
 +
 +        for (final Command cmd : cmds) {
 +            tagCommand(cmd);
 +        }
 +        return cmds;
 +    }
 +
 +    /**
 +     * @param commands
 +     * @param cmds
 +     */
 +    private void setEmptyAnswers(final Commands commands, final Command[] 
cmds) {
 +        if (cmds.length == 0) {
 +            commands.setAnswers(new Answer[0]);
 +        }
 +    }
 +
 +    @Override
 +    public Answer[] send(final Long hostId, final Commands commands, int 
timeout) throws AgentUnavailableException, OperationTimedoutException {
 +        assert hostId != null : "Who's not checking the agent id before 
sending?  ... (finger wagging)";
 +        if (hostId == null) {
 +            throw new AgentUnavailableException(-1);
 +        }
 +
 +        if (timeout <= 0) {
 +            timeout = Wait.value();
 +        }
 +
 +        if (CheckTxnBeforeSending.value()) {
 +            if (!noDbTxn()) {
 +                throw new CloudRuntimeException("We do not allow transactions 
to be wrapped around commands sent to be executed on remote agents.  "
 +                        + "We cannot predict how long it takes a command to 
complete.  " + "The transaction may be rolled back because the connection took 
too long.");
 +            }
 +        } else {
 +            assert noDbTxn() : "I know, I know.  Why are we so strict as to 
not allow txn across an agent call?  ...  Why are we so cruel ... Why are we 
such a dictator .... Too bad... Sorry...but NO AGENT COMMANDS WRAPPED WITHIN DB 
TRANSACTIONS!";
 +        }
 +
 +        final Command[] cmds = checkForCommandsAndTag(commands);
 +
 +        //check what agent is returned.
 +        final AgentAttache agent = getAttache(hostId);
 +        if (agent == null || agent.isClosed()) {
 +            throw new AgentUnavailableException("agent not logged into this 
management server", hostId);
 +        }
 +
 +        final Request req = new Request(hostId, agent.getName(), _nodeId, 
cmds, commands.stopOnError(), true);
 +        req.setSequence(agent.getNextSequence());
 +        final Answer[] answers = agent.send(req, timeout);
 +        notifyAnswersToMonitors(hostId, req.getSequence(), answers);
 +        commands.setAnswers(answers);
 +        return answers;
 +    }
 +
 +    protected Status investigate(final AgentAttache agent) {
 +        final Long hostId = agent.getId();
 +        final HostVO host = _hostDao.findById(hostId);
 +        if (host != null && host.getType() != null && 
!host.getType().isVirtual()) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("checking if agent (" + hostId + ") is alive");
 +            }
 +            final Answer answer = easySend(hostId, new CheckHealthCommand());
 +            if (answer != null && answer.getResult()) {
 +                final Status status = Status.Up;
 +                if (s_logger.isDebugEnabled()) {
 +                    s_logger.debug("agent (" + hostId + ") responded to 
checkHeathCommand, reporting that agent is " + status);
 +                }
 +                return status;
 +            }
 +            return _haMgr.investigate(hostId);
 +        }
 +        return Status.Alert;
 +    }
 +
 +    protected AgentAttache getAttache(final Long hostId) throws 
AgentUnavailableException {
 +        if (hostId == null) {
 +            return null;
 +        }
 +        final AgentAttache agent = findAttache(hostId);
 +        if (agent == null) {
 +            s_logger.debug("Unable to find agent for " + hostId);
 +            throw new AgentUnavailableException("Unable to find agent ", 
hostId);
 +        }
 +
 +        return agent;
 +    }
 +
 +    @Override
 +    public long send(final Long hostId, final Commands commands, final 
Listener listener) throws AgentUnavailableException {
 +        final AgentAttache agent = getAttache(hostId);
 +        if (agent.isClosed()) {
 +            throw new AgentUnavailableException("Agent " + agent.getId() + " 
is closed", agent.getId());
 +        }
 +
 +        final Command[] cmds = checkForCommandsAndTag(commands);
 +
 +        final Request req = new Request(hostId, agent.getName(), _nodeId, 
cmds, commands.stopOnError(), true);
 +        req.setSequence(agent.getNextSequence());
 +
 +        agent.send(req, listener);
 +        return req.getSequence();
 +    }
 +
 +    public void removeAgent(final AgentAttache attache, final Status 
nextState) {
 +        if (attache == null) {
 +            return;
 +        }
 +        final long hostId = attache.getId();
 +        if (s_logger.isDebugEnabled()) {
 +            s_logger.debug("Remove Agent : " + hostId);
 +        }
 +        AgentAttache removed = null;
 +        boolean conflict = false;
 +        synchronized (_agents) {
 +            removed = _agents.remove(hostId);
 +            if (removed != null && removed != attache) {
 +                conflict = true;
 +                _agents.put(hostId, removed);
 +                removed = attache;
 +            }
 +        }
 +        if (conflict) {
 +            s_logger.debug("Agent for host " + hostId + " is created when it 
is being disconnected");
 +        }
 +        if (removed != null) {
 +            removed.disconnect(nextState);
 +        }
 +
 +        for (final Pair<Integer, Listener> monitor : _hostMonitors) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Sending Disconnect to listener: " + 
monitor.second().getClass().getName());
 +            }
 +            monitor.second().processDisconnect(hostId, nextState);
 +        }
 +    }
 +
 +    @Override
 +    public void notifyMonitorsOfNewlyAddedHost(long hostId) {
 +        for (final Pair<Integer, Listener> monitor : _hostMonitors) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Sending host added to listener: " + 
monitor.second().getClass().getSimpleName());
 +            }
 +
 +            monitor.second().processHostAdded(hostId);
 +        }
 +    }
 +
 +    protected AgentAttache notifyMonitorsOfConnection(final AgentAttache 
attache, final StartupCommand[] cmd, final boolean forRebalance) throws 
ConnectionException {
 +        final long hostId = attache.getId();
 +        final HostVO host = _hostDao.findById(hostId);
 +        for (final Pair<Integer, Listener> monitor : _hostMonitors) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Sending Connect to listener: " + 
monitor.second().getClass().getSimpleName());
 +            }
 +            for (int i = 0; i < cmd.length; i++) {
 +                try {
 +                    monitor.second().processConnect(host, cmd[i], 
forRebalance);
 +                } catch (final Exception e) {
 +                    if (e instanceof ConnectionException) {
 +                        final ConnectionException ce = (ConnectionException)e;
 +                        if (ce.isSetupError()) {
 +                            s_logger.warn("Monitor " + 
monitor.second().getClass().getSimpleName() + " says there is an error in the 
connect process for " + hostId + " due to " + e.getMessage());
 +                            handleDisconnectWithoutInvestigation(attache, 
Event.AgentDisconnected, true, true);
 +                            throw ce;
 +                        } else {
 +                            s_logger.info("Monitor " + 
monitor.second().getClass().getSimpleName() + " says not to continue the 
connect process for " + hostId + " due to " + e.getMessage());
 +                            handleDisconnectWithoutInvestigation(attache, 
Event.ShutdownRequested, true, true);
 +                            return attache;
 +                        }
 +                    } else if (e instanceof 
HypervisorVersionChangedException) {
 +                        handleDisconnectWithoutInvestigation(attache, 
Event.ShutdownRequested, true, true);
 +                        throw new CloudRuntimeException("Unable to connect " 
+ attache.getId(), e);
 +                    } else {
 +                        s_logger.error("Monitor " + 
monitor.second().getClass().getSimpleName() + " says there is an error in the 
connect process for " + hostId + " due to " + e.getMessage(), e);
 +                        handleDisconnectWithoutInvestigation(attache, 
Event.AgentDisconnected, true, true);
 +                        throw new CloudRuntimeException("Unable to connect " 
+ attache.getId(), e);
 +                    }
 +                }
 +            }
 +        }
 +
 +        final Long dcId = host.getDataCenterId();
 +        final ReadyCommand ready = new ReadyCommand(dcId, host.getId());
 +        final Answer answer = easySend(hostId, ready);
 +        if (answer == null || !answer.getResult()) {
 +            // this is tricky part for secondary storage
 +            // make it as disconnected, wait for secondary storage VM to be up
 +            // return the attache instead of null, even it is disconnectede
 +            handleDisconnectWithoutInvestigation(attache, 
Event.AgentDisconnected, true, true);
 +        }
 +
 +        agentStatusTransitTo(host, Event.Ready, _nodeId);
 +        attache.ready();
 +        return attache;
 +    }
 +
 +    @Override
 +    public boolean start() {
 +        startDirectlyConnectedHosts();
 +
 +        if (_connection != null) {
 +            try {
 +                _connection.start();
 +            } catch (final NioConnectionException e) {
 +                s_logger.error("Error when connecting to the NioServer!", e);
 +            }
 +        }
 +
 +        _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), 
mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), 
TimeUnit.SECONDS);
 +
 +        return true;
 +    }
 +
 +    public void startDirectlyConnectedHosts() {
 +        final List<HostVO> hosts = _resourceMgr.findDirectlyConnectedHosts();
 +        for (final HostVO host : hosts) {
 +            loadDirectlyConnectedHost(host, false);
 +        }
 +    }
 +
 +    private ServerResource loadResourcesWithoutHypervisor(final HostVO host) {
 +        final String resourceName = host.getResource();
 +        ServerResource resource = null;
 +        try {
 +            final Class<?> clazz = Class.forName(resourceName);
 +            final Constructor<?> constructor = clazz.getConstructor();
 +            resource = (ServerResource)constructor.newInstance();
 +        } catch (final ClassNotFoundException e) {
 +            s_logger.warn("Unable to find class " + host.getResource(), e);
 +        } catch (final InstantiationException e) {
 +            s_logger.warn("Unablet to instantiate class " + 
host.getResource(), e);
 +        } catch (final IllegalAccessException e) {
 +            s_logger.warn("Illegal access " + host.getResource(), e);
 +        } catch (final SecurityException e) {
 +            s_logger.warn("Security error on " + host.getResource(), e);
 +        } catch (final NoSuchMethodException e) {
 +            s_logger.warn("NoSuchMethodException error on " + 
host.getResource(), e);
 +        } catch (final IllegalArgumentException e) {
 +            s_logger.warn("IllegalArgumentException error on " + 
host.getResource(), e);
 +        } catch (final InvocationTargetException e) {
 +            s_logger.warn("InvocationTargetException error on " + 
host.getResource(), e);
 +        }
 +
 +        if (resource != null) {
 +            _hostDao.loadDetails(host);
 +
 +            final HashMap<String, Object> params = new HashMap<String, 
Object>(host.getDetails().size() + 5);
 +            params.putAll(host.getDetails());
 +
 +            params.put("guid", host.getGuid());
 +            params.put("zone", Long.toString(host.getDataCenterId()));
 +            if (host.getPodId() != null) {
 +                params.put("pod", Long.toString(host.getPodId()));
 +            }
 +            if (host.getClusterId() != null) {
 +                params.put("cluster", Long.toString(host.getClusterId()));
 +                String guid = null;
 +                final ClusterVO cluster = 
_clusterDao.findById(host.getClusterId());
 +                if (cluster.getGuid() == null) {
 +                    guid = host.getDetail("pool");
 +                } else {
 +                    guid = cluster.getGuid();
 +                }
 +                if (guid != null && !guid.isEmpty()) {
 +                    params.put("pool", guid);
 +                }
 +            }
 +
 +            params.put("ipaddress", host.getPrivateIpAddress());
 +            params.put("secondary.storage.vm", "false");
 +
 +            try {
 +                resource.configure(host.getName(), params);
 +            } catch (final ConfigurationException e) {
 +                s_logger.warn("Unable to configure resource due to " + 
e.getMessage());
 +                return null;
 +            }
 +
 +            if (!resource.start()) {
 +                s_logger.warn("Unable to start the resource");
 +                return null;
 +            }
 +        }
 +        return resource;
 +    }
 +
 +    @Override
 +    public void rescan() {
 +    }
 +
 +    protected boolean loadDirectlyConnectedHost(final HostVO host, final 
boolean forRebalance) {
 +        boolean initialized = false;
 +        ServerResource resource = null;
 +        try {
 +            // load the respective discoverer
 +            final Discoverer discoverer = 
_resourceMgr.getMatchingDiscover(host.getHypervisorType());
 +            if (discoverer == null) {
 +                s_logger.info("Could not to find a Discoverer to load the 
resource: " + host.getId() + " for hypervisor type: " + 
host.getHypervisorType());
 +                resource = loadResourcesWithoutHypervisor(host);
 +            } else {
 +                resource = discoverer.reloadResource(host);
 +            }
 +
 +            if (resource == null) {
 +                s_logger.warn("Unable to load the resource: " + host.getId());
 +                return false;
 +            }
 +
 +            initialized = true;
 +        } finally {
 +            if (!initialized) {
 +                if (host != null) {
 +                    agentStatusTransitTo(host, Event.AgentDisconnected, 
_nodeId);
 +                }
 +            }
 +        }
 +
 +        if (forRebalance) {
 +            tapLoadingAgents(host.getId(), TapAgentsAction.Add);
 +            final Host h = _resourceMgr.createHostAndAgent(host.getId(), 
resource, host.getDetails(), false, null, true);
 +            tapLoadingAgents(host.getId(), TapAgentsAction.Del);
 +
 +            return h == null ? false : true;
 +        } else {
 +            _executor.execute(new SimulateStartTask(host.getId(), resource, 
host.getDetails()));
 +            return true;
 +        }
 +    }
 +
 +    protected AgentAttache createAttacheForDirectConnect(final Host host, 
final ServerResource resource) throws ConnectionException {
 +        s_logger.debug("create DirectAgentAttache for " + host.getId());
 +        final DirectAgentAttache attache = new DirectAgentAttache(this, 
host.getId(), host.getName(), resource, host.isInMaintenanceStates());
 +
 +        AgentAttache old = null;
 +        synchronized (_agents) {
 +            old = _agents.put(host.getId(), attache);
 +        }
 +        if (old != null) {
 +            old.disconnect(Status.Removed);
 +        }
 +
 +        return attache;
 +    }
 +
 +    @Override
 +    public boolean stop() {
 +
 +        if (_connection != null) {
 +            _connection.stop();
 +        }
 +
 +        s_logger.info("Disconnecting agents: " + _agents.size());
 +        synchronized (_agents) {
 +            for (final AgentAttache agent : _agents.values()) {
 +                final HostVO host = _hostDao.findById(agent.getId());
 +                if (host == null) {
 +                    if (s_logger.isDebugEnabled()) {
 +                        s_logger.debug("Cant not find host " + agent.getId());
 +                    }
 +                } else {
 +                    if (!agent.forForward()) {
 +                        agentStatusTransitTo(host, 
Event.ManagementServerDown, _nodeId);
 +                    }
 +                }
 +            }
 +        }
 +
 +        _connectExecutor.shutdownNow();
 +        _monitorExecutor.shutdownNow();
 +        return true;
 +    }
 +
 +    protected boolean handleDisconnectWithoutInvestigation(final AgentAttache 
attache, final Status.Event event, final boolean transitState, final boolean 
removeAgent) {
 +        final long hostId = attache.getId();
 +
 +        s_logger.info("Host " + hostId + " is disconnecting with event " + 
event);
 +        Status nextStatus = null;
 +        final HostVO host = _hostDao.findById(hostId);
 +        if (host == null) {
 +            s_logger.warn("Can't find host with " + hostId);
 +            nextStatus = Status.Removed;
 +        } else {
 +            final Status currentStatus = host.getStatus();
 +            if (currentStatus == Status.Down || currentStatus == Status.Alert 
|| currentStatus == Status.Removed) {
 +                if (s_logger.isDebugEnabled()) {
 +                    s_logger.debug("Host " + hostId + " is already " + 
currentStatus);
 +                }
 +                nextStatus = currentStatus;
 +            } else {
 +                try {
 +                    nextStatus = currentStatus.getNextStatus(event);
 +                } catch (final NoTransitionException e) {
 +                    final String err = "Cannot find next status for " + event 
+ " as current status is " + currentStatus + " for agent " + hostId;
 +                    s_logger.debug(err);
 +                    throw new CloudRuntimeException(err);
 +                }
 +
 +                if (s_logger.isDebugEnabled()) {
 +                    s_logger.debug("The next status of agent " + hostId + "is 
" + nextStatus + ", current status is " + currentStatus);
 +                }
 +            }
 +            caService.purgeHostCertificate(host);
 +        }
 +
 +        if (s_logger.isDebugEnabled()) {
 +            s_logger.debug("Deregistering link for " + hostId + " with state 
" + nextStatus);
 +        }
 +
 +        removeAgent(attache, nextStatus);
 +        // update the DB
 +        if (host != null && transitState) {
 +            disconnectAgent(host, event, _nodeId);
 +        }
 +
 +        return true;
 +    }
 +
 +    protected boolean handleDisconnectWithInvestigation(final AgentAttache 
attache, Status.Event event) {
 +        final long hostId = attache.getId();
 +        HostVO host = _hostDao.findById(hostId);
 +
 +        if (host != null) {
 +            Status nextStatus = null;
 +            try {
 +                nextStatus = host.getStatus().getNextStatus(event);
 +            } catch (final NoTransitionException ne) {
 +                /*
 +                 * Agent may be currently in status of Down, Alert, Removed, 
namely there is no next status for some events. Why this can happen? Ask God 
not me. I hate there was
 +                 * no piece of comment for code handling race condition. God 
knew what race condition the code dealt with!
 +                 */
 +                s_logger.debug("Caught exception while getting agent's next 
status", ne);
 +            }
 +
 +            if (nextStatus == Status.Alert) {
 +                /* OK, we are going to the bad status, let's see what 
happened */
 +                s_logger.info("Investigating why host " + hostId + " has 
disconnected with event " + event);
 +
 +                Status determinedState = investigate(attache);
 +                // if state cannot be determined do nothing and bail out
 +                if (determinedState == null) {
 +                    if ((System.currentTimeMillis() >> 10) - 
host.getLastPinged() > AlertWait.value()) {
 +                        s_logger.warn("Agent " + hostId + " state cannot be 
determined for more than " + AlertWait + "(" + AlertWait.value() + ") seconds, 
will go to Alert state");
 +                        determinedState = Status.Alert;
 +                    } else {
 +                        s_logger.warn("Agent " + hostId + " state cannot be 
determined, do nothing");
 +                        return false;
 +                    }
 +                }
 +
 +                final Status currentStatus = host.getStatus();
 +                s_logger.info("The agent from host " + hostId + " state 
determined is " + determinedState);
 +
 +                if (determinedState == Status.Down) {
 +                    final String message = "Host is down: " + host.getId() + 
"-" + host.getName() + ". Starting HA on the VMs";
 +                    s_logger.error(message);
 +                    if (host.getType() != Host.Type.SecondaryStorage && 
host.getType() != Host.Type.ConsoleProxy) {
 +                        
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, 
host.getDataCenterId(), host.getPodId(), "Host down, " + host.getId(), message);
 +                    }
 +                    event = Status.Event.HostDown;
 +                } else if (determinedState == Status.Up) {
 +                    /* Got ping response from host, bring it back */
 +                    s_logger.info("Agent is determined to be up and running");
 +                    agentStatusTransitTo(host, Status.Event.Ping, _nodeId);
 +                    return false;
 +                } else if (determinedState == Status.Disconnected) {
 +                    s_logger.warn("Agent is disconnected but the host is 
still up: " + host.getId() + "-" + host.getName());
 +                    if (currentStatus == Status.Disconnected) {
 +                        if ((System.currentTimeMillis() >> 10) - 
host.getLastPinged() > AlertWait.value()) {
 +                            s_logger.warn("Host " + host.getId() + " has been 
disconnected past the wait time it should be disconnected.");
 +                            event = Status.Event.WaitedTooLong;
 +                        } else {
 +                            s_logger.debug("Host " + host.getId() + " has 
been determined to be disconnected but it hasn't passed the wait time yet.");
 +                            return false;
 +                        }
 +                    } else if (currentStatus == Status.Up) {
 +                        final DataCenterVO dcVO = 
_dcDao.findById(host.getDataCenterId());
 +                        final HostPodVO podVO = 
_podDao.findById(host.getPodId());
 +                        final String hostDesc = "name: " + host.getName() + " 
(id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + 
podVO.getName();
 +                        if (host.getType() != Host.Type.SecondaryStorage && 
host.getType() != Host.Type.ConsoleProxy) {
 +                            
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, 
host.getDataCenterId(), host.getPodId(), "Host disconnected, " + hostDesc,
 +                                    "If the agent for host [" + hostDesc + "] 
is not restarted within " + AlertWait + " seconds, host will go to Alert 
state");
 +                        }
 +                        event = Status.Event.AgentDisconnected;
 +                    }
 +                } else {
 +                    // if we end up here we are in alert state, send an alert
 +                    final DataCenterVO dcVO = 
_dcDao.findById(host.getDataCenterId());
 +                    final HostPodVO podVO = _podDao.findById(host.getPodId());
 +                    final String podName = podVO != null ? podVO.getName() : 
"NO POD";
 +                    final String hostDesc = "name: " + host.getName() + " 
(id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + 
podName;
 +                    
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, 
host.getDataCenterId(), host.getPodId(), "Host in ALERT state, " + hostDesc,
 +                            "In availability zone " + host.getDataCenterId() 
+ ", host is in alert state: " + host.getId() + "-" + host.getName());
 +                }
 +            } else {
 +                s_logger.debug("The next status of agent " + host.getId() + " 
is not Alert, no need to investigate what happened");
 +            }
 +        }
 +        handleDisconnectWithoutInvestigation(attache, event, true, true);
 +        host = _hostDao.findById(hostId); // Maybe the host magically 
reappeared?
 +        if (host != null && host.getStatus() == Status.Down) {
 +            _haMgr.scheduleRestartForVmsOnHost(host, true);
 +        }
 +        return true;
 +    }
 +
 +    protected class DisconnectTask extends ManagedContextRunnable {
 +        AgentAttache _attache;
 +        Status.Event _event;
 +        boolean _investigate;
 +
 +        DisconnectTask(final AgentAttache attache, final Status.Event event, 
final boolean investigate) {
 +            _attache = attache;
 +            _event = event;
 +            _investigate = investigate;
 +        }
 +
 +        @Override
 +        protected void runInContext() {
 +            try {
 +                if (_investigate == true) {
 +                    handleDisconnectWithInvestigation(_attache, _event);
 +                } else {
 +                    handleDisconnectWithoutInvestigation(_attache, _event, 
true, false);
 +                }
 +            } catch (final Exception e) {
 +                s_logger.error("Exception caught while handling disconnect: 
", e);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public Answer easySend(final Long hostId, final Command cmd) {
 +        try {
 +            final Host h = _hostDao.findById(hostId);
 +            if (h == null || h.getRemoved() != null) {
 +                s_logger.debug("Host with id " + hostId + " doesn't exist");
 +                return null;
 +            }
 +            final Status status = h.getStatus();
 +            if (!status.equals(Status.Up) && 
!status.equals(Status.Connecting)) {
 +                s_logger.debug("Can not send command " + cmd + " due to Host 
" + hostId + " is not up");
 +                return null;
 +            }
 +            final Answer answer = send(hostId, cmd);
 +            if (answer == null) {
 +                s_logger.warn("send returns null answer");
 +                return null;
 +            }
 +
 +            if (s_logger.isDebugEnabled() && answer.getDetails() != null) {
 +                s_logger.debug("Details from executing " + cmd.getClass() + 
": " + answer.getDetails());
 +            }
 +
 +            return answer;
 +
 +        } catch (final AgentUnavailableException e) {
 +            s_logger.warn(e.getMessage());
 +            return null;
 +        } catch (final OperationTimedoutException e) {
 +            s_logger.warn("Operation timed out: " + e.getMessage());
 +            return null;
 +        } catch (final Exception e) {
 +            s_logger.warn("Exception while sending", e);
 +            return null;
 +        }
 +    }
 +
 +    @Override
 +    public Answer[] send(final Long hostId, final Commands cmds) throws 
AgentUnavailableException, OperationTimedoutException {
 +        int wait = 0;
 +        for (final Command cmd : cmds) {
 +            if (cmd.getWait() > wait) {
 +                wait = cmd.getWait();
 +            }
 +        }
 +        return send(hostId, cmds, wait);
 +    }
 +
 +    @Override
 +    public void reconnect(final long hostId) throws AgentUnavailableException 
{
 +        HostVO host = _hostDao.findById(hostId);
 +        if (host == null) {
 +            throw new CloudRuntimeException("Unable to find host: " + hostId);
 +        }
 +
 +        if (host.getRemoved() != null) {
 +            throw new CloudRuntimeException("Host has already been removed: " 
+ hostId);
 +        }
 +
 +        if (host.getStatus() == Status.Disconnected) {
 +            s_logger.debug("Host is already disconnected, no work to be done: 
" + hostId);
 +            return;
 +        }
 +
 +        if (host.getStatus() != Status.Up && host.getStatus() != Status.Alert 
&& host.getStatus() != Status.Rebalancing) {
 +            throw new CloudRuntimeException("Unable to disconnect host 
because it is not in the correct state: host=" + hostId + "; Status=" + 
host.getStatus());
 +        }
 +
 +        AgentAttache attache = findAttache(hostId);
 +        if (attache == null) {
 +            throw new CloudRuntimeException("Unable to disconnect host 
because it is not connected to this server: " + hostId);
 +        }
 +        disconnectWithoutInvestigation(attache, Event.ShutdownRequested);
 +    }
 +
 +    @Override
 +    public void notifyMonitorsOfHostAboutToBeRemoved(long hostId) {
 +        for (final Pair<Integer, Listener> monitor : _hostMonitors) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Sending host about to be removed to listener: 
" + monitor.second().getClass().getSimpleName());
 +            }
 +
 +            monitor.second().processHostAboutToBeRemoved(hostId);
 +        }
 +    }
 +
 +    @Override
 +    public void notifyMonitorsOfRemovedHost(long hostId, long clusterId) {
 +        for (final Pair<Integer, Listener> monitor : _hostMonitors) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Sending host removed to listener: " + 
monitor.second().getClass().getSimpleName());
 +            }
 +
 +            monitor.second().processHostRemoved(hostId, clusterId);
 +        }
 +    }
 +
 +    public boolean executeUserRequest(final long hostId, final Event event) 
throws AgentUnavailableException {
 +        if (event == Event.AgentDisconnected) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Received agent disconnect event for host " + 
hostId);
 +            }
 +            AgentAttache attache = null;
 +            attache = findAttache(hostId);
 +            if (attache != null) {
 +                handleDisconnectWithoutInvestigation(attache, 
Event.AgentDisconnected, true, true);
 +            }
 +            return true;
 +        }
 +        if (event == Event.ShutdownRequested) {
 +            try {
 +                reconnect(hostId);
 +            } catch (CloudRuntimeException e) {
 +                s_logger.debug("Error on shutdown request for hostID: " + 
hostId, e);
 +                return false;
 +            }
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean isAgentAttached(final long hostId) {
 +        final AgentAttache agentAttache = findAttache(hostId);
 +        return agentAttache != null;
 +    }
 +
 +    protected AgentAttache createAttacheForConnect(final HostVO host, final 
Link link) throws ConnectionException {
 +        s_logger.debug("create ConnectedAgentAttache for " + host.getId());
 +        final AgentAttache attache = new ConnectedAgentAttache(this, 
host.getId(), host.getName(), link, host.isInMaintenanceStates());
 +        link.attach(attache);
 +
 +        AgentAttache old = null;
 +        synchronized (_agents) {
 +            old = _agents.put(host.getId(), attache);
 +        }
 +        if (old != null) {
 +            old.disconnect(Status.Removed);
 +        }
 +
 +        return attache;
 +    }
 +
 +    private AgentAttache handleConnectedAgent(final Link link, final 
StartupCommand[] startup, final Request request) {
 +        AgentAttache attache = null;
 +        ReadyCommand ready = null;
 +        try {
++            final List<String> agentMSHostList = new ArrayList<>();
++            if (startup != null && startup.length > 0) {
++                final String agentMSHosts = startup[0].getMsHostList();
++                if (!Strings.isNullOrEmpty(agentMSHosts)) {
++                    
agentMSHostList.addAll(Arrays.asList(agentMSHosts.split(",")));
++                }
++            }
++
 +            final HostVO host = 
_resourceMgr.createHostVOForConnectedAgent(startup);
 +            if (host != null) {
 +                ready = new ReadyCommand(host.getDataCenterId(), 
host.getId());
++
++                if 
(!indirectAgentLB.compareManagementServerList(host.getId(), 
host.getDataCenterId(), agentMSHostList)) {
++                    final List<String> newMSList = 
indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), 
null);
++                    ready.setMsHostList(newMSList);
++                    
ready.setLbAlgorithm(indirectAgentLB.getLBAlgorithmName());
++                    
ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId()));
++                    s_logger.debug("Agent's management server host list is 
not up to date, sending list update:" + newMSList);
++                }
++
 +                attache = createAttacheForConnect(host, link);
 +                attache = notifyMonitorsOfConnection(attache, startup, false);
 +            }
 +        } catch (final Exception e) {
-             s_logger.debug("Failed to handle host connection: " + 
e.toString());
++            s_logger.debug("Failed to handle host connection: ", e);
 +            ready = new ReadyCommand(null);
 +            ready.setDetails(e.toString());
 +        } finally {
 +            if (ready == null) {
 +                ready = new ReadyCommand(null);
 +            }
 +        }
 +
 +        try {
 +            if (attache == null) {
 +                final Request readyRequest = new Request(-1, -1, ready, 
false);
 +                link.send(readyRequest.getBytes());
 +            } else {
 +                easySend(attache.getId(), ready);
 +            }
 +        } catch (final Exception e) {
 +            s_logger.debug("Failed to send ready command:" + e.toString());
 +        }
 +        return attache;
 +    }
 +
 +    protected class SimulateStartTask extends ManagedContextRunnable {
 +        ServerResource resource;
 +        Map<String, String> details;
 +        long id;
 +
 +        public SimulateStartTask(final long id, final ServerResource 
resource, final Map<String, String> details) {
 +            this.id = id;
 +            this.resource = resource;
 +            this.details = details;
 +        }
 +
 +        @Override
 +        protected void runInContext() {
 +            try {
 +                if (s_logger.isDebugEnabled()) {
 +                    s_logger.debug("Simulating start for resource " + 
resource.getName() + " id " + id);
 +                }
 +
 +                if (tapLoadingAgents(id, TapAgentsAction.Add)) {
 +                    try {
 +                        final AgentAttache agentattache = findAttache(id);
 +                        if (agentattache == null) {
 +                            s_logger.debug("Creating agent for host " + id);
 +                            _resourceMgr.createHostAndAgent(id, resource, 
details, false, null, false);
 +                            s_logger.debug("Completed creating agent for host 
" + id);
 +                        } else {
 +                            s_logger.debug("Agent already created in another 
thread for host " + id + ", ignore this");
 +                        }
 +                    } finally {
 +                        tapLoadingAgents(id, TapAgentsAction.Del);
 +                    }
 +                } else {
 +                    s_logger.debug("Agent creation already getting processed 
in another thread for host " + id + ", ignore this");
 +                }
 +            } catch (final Exception e) {
 +                s_logger.warn("Unable to simulate start on resource " + id + 
" name " + resource.getName(), e);
 +            }
 +        }
 +    }
 +
 +    protected class HandleAgentConnectTask extends ManagedContextRunnable {
 +        Link _link;
 +        Command[] _cmds;
 +        Request _request;
 +
 +        HandleAgentConnectTask(final Link link, final Command[] cmds, final 
Request request) {
 +            _link = link;
 +            _cmds = cmds;
 +            _request = request;
 +        }
 +
 +        @Override
 +        protected void runInContext() {
 +            _request.logD("Processing the first command ");
 +            final StartupCommand[] startups = new 
StartupCommand[_cmds.length];
 +            for (int i = 0; i < _cmds.length; i++) {
 +                startups[i] = (StartupCommand)_cmds[i];
 +            }
 +
 +            final AgentAttache attache = handleConnectedAgent(_link, 
startups, _request);
 +            if (attache == null) {
 +                s_logger.warn("Unable to create attache for agent: " + 
_request);
 +            }
 +        }
 +    }
 +
 +    protected void connectAgent(final Link link, final Command[] cmds, final 
Request request) {
 +        // send startupanswer to agent in the very beginning, so agent can 
move on without waiting for the answer for an undetermined time, if we put this 
logic into another
 +        // thread pool.
 +        final StartupAnswer[] answers = new StartupAnswer[cmds.length];
 +        Command cmd;
 +        for (int i = 0; i < cmds.length; i++) {
 +            cmd = cmds[i];
 +            if (cmd instanceof StartupRoutingCommand || cmd instanceof 
StartupProxyCommand || cmd instanceof StartupSecondaryStorageCommand ||
 +                    cmd instanceof StartupStorageCommand) {
 +                answers[i] = new StartupAnswer((StartupCommand) cmds[i], 0, 
mgmtServiceConf.getPingInterval());
 +                break;
 +            }
 +        }
 +        Response response = null;
 +        response = new Response(request, answers[0], _nodeId, -1);
 +        try {
 +            link.send(response.toBytes());
 +        } catch (final ClosedChannelException e) {
 +            s_logger.debug("Failed to send startupanswer: " + e.toString());
 +        }
 +        _connectExecutor.execute(new HandleAgentConnectTask(link, cmds, 
request));
 +    }
 +
 +    public class AgentHandler extends Task {
 +        public AgentHandler(final Task.Type type, final Link link, final 
byte[] data) {
 +            super(type, link, data);
 +        }
 +
 +        protected void processRequest(final Link link, final Request request) 
{
 +            final AgentAttache attache = (AgentAttache)link.attachment();
 +            final Command[] cmds = request.getCommands();
 +            Command cmd = cmds[0];
 +            boolean logD = true;
 +
 +            if (attache == null) {
 +                if (!(cmd instanceof StartupCommand)) {
 +                    s_logger.warn("Throwing away a request because it came 
through as the first command on a connect: " + request);
 +                } else {
 +                    // submit the task for execution
 +                    request.logD("Scheduling the first command ");
 +                    connectAgent(link, cmds, request);
 +                }
 +                return;
 +            }
 +
 +            final long hostId = attache.getId();
 +            final String hostName = attache.getName();
 +
 +            if (s_logger.isDebugEnabled()) {
 +                if (cmd instanceof PingRoutingCommand) {
 +                    logD = false;
 +                    s_logger.debug("Ping from " + hostId + "(" + hostName + 
")");
 +                    s_logger.trace("SeqA " + hostId + "-" + 
request.getSequence() + ": Processing " + request);
 +                } else if (cmd instanceof PingCommand) {
 +                    logD = false;
 +                    s_logger.debug("Ping from " + hostId + "(" + hostName + 
")");
 +                    s_logger.trace("SeqA " + hostId + "-" + 
request.getSequence() + ": Processing " + request);
 +                } else {
 +                    s_logger.debug("SeqA " + hostId + "-" + 
request.getSequence() + ": Processing " + request);
 +                }
 +            }
 +
 +            final Answer[] answers = new Answer[cmds.length];
 +            for (int i = 0; i < cmds.length; i++) {
 +                cmd = cmds[i];
 +                Answer answer = null;
 +                try {
 +                    if (cmd instanceof StartupRoutingCommand) {
 +                        final StartupRoutingCommand startup = 
(StartupRoutingCommand) cmd;
 +                        answer = new StartupAnswer(startup, attache.getId(), 
mgmtServiceConf.getPingInterval());
 +                    } else if (cmd instanceof StartupProxyCommand) {
 +                        final StartupProxyCommand startup = 
(StartupProxyCommand) cmd;
 +                        answer = new StartupAnswer(startup, attache.getId(), 
mgmtServiceConf.getPingInterval());
 +                    } else if (cmd instanceof StartupSecondaryStorageCommand) 
{
 +                        final StartupSecondaryStorageCommand startup = 
(StartupSecondaryStorageCommand) cmd;
 +                        answer = new StartupAnswer(startup, attache.getId(), 
mgmtServiceConf.getPingInterval());
 +                    } else if (cmd instanceof StartupStorageCommand) {
 +                        final StartupStorageCommand startup = 
(StartupStorageCommand) cmd;
 +                        answer = new StartupAnswer(startup, attache.getId(), 
mgmtServiceConf.getPingInterval());
 +                    } else if (cmd instanceof ShutdownCommand) {
 +                        final ShutdownCommand shutdown = (ShutdownCommand)cmd;
 +                        final String reason = shutdown.getReason();
 +                        s_logger.info("Host " + attache.getId() + " has 
informed us that it is shutting down with reason " + reason + " and detail " + 
shutdown.getDetail());
 +                        if (reason.equals(ShutdownCommand.Update)) {
 +                            // disconnectWithoutInvestigation(attache, 
Event.UpdateNeeded);
 +                            throw new CloudRuntimeException("Agent update not 
implemented");
 +                        } else if (reason.equals(ShutdownCommand.Requested)) {
 +                            disconnectWithoutInvestigation(attache, 
Event.ShutdownRequested);
 +                        }
 +                        return;
 +                    } else if (cmd instanceof AgentControlCommand) {
 +                        answer = handleControlCommand(attache, 
(AgentControlCommand)cmd);
 +                    } else {
 +                        handleCommands(attache, request.getSequence(), new 
Command[] {cmd});
 +                        if (cmd instanceof PingCommand) {
 +                            final long cmdHostId = 
((PingCommand)cmd).getHostId();
 +
 +                            // if the router is sending a ping, verify the
 +                            // gateway was pingable
 +                            if (cmd instanceof PingRoutingCommand) {
 +                                final boolean gatewayAccessible = 
((PingRoutingCommand)cmd).isGatewayAccessible();
 +                                final HostVO host = 
_hostDao.findById(Long.valueOf(cmdHostId));
 +
 +                                if (host != null) {
 +                                    if (!gatewayAccessible) {
 +                                        // alert that host lost connection to
 +                                        // gateway (cannot ping the default 
route)
 +                                        final DataCenterVO dcVO = 
_dcDao.findById(host.getDataCenterId());
 +                                        final HostPodVO podVO = 
_podDao.findById(host.getPodId());
 +                                        final String hostDesc = "name: " + 
host.getName() + " (id:" + host.getId() + "), availability zone: " + 
dcVO.getName() + ", pod: " + podVO.getName();
 +
 +                                        
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_ROUTING, 
host.getDataCenterId(), host.getPodId(), "Host lost connection to gateway, " + 
hostDesc,
 +                                                "Host [" + hostDesc + "] lost 
connection to gateway (default route) and is possibly having network connection 
issues.");
 +                                    } else {
 +                                        
_alertMgr.clearAlert(AlertManager.AlertType.ALERT_TYPE_ROUTING, 
host.getDataCenterId(), host.getPodId());
 +                                    }
 +                                } else {
 +                                    s_logger.debug("Not processing " + 
PingRoutingCommand.class.getSimpleName() + " for agent id=" + cmdHostId + "; 
can't find the host in the DB");
 +                                }
 +                            }
 +                            answer = new PingAnswer((PingCommand)cmd);
 +                        } else if (cmd instanceof ReadyAnswer) {
 +                            final HostVO host = 
_hostDao.findById(attache.getId());
 +                            if (host == null) {
 +                                if (s_logger.isDebugEnabled()) {
 +                                    s_logger.debug("Cant not find host " + 
attache.getId());
 +                                }
 +                            }
 +                            answer = new Answer(cmd);
 +                        } else {
 +                            answer = new Answer(cmd);
 +                        }
 +                    }
 +                } catch (final Throwable th) {
 +                    s_logger.warn("Caught: ", th);
 +                    answer = new Answer(cmd, false, th.getMessage());
 +                }
 +                answers[i] = answer;
 +            }
 +
 +            final Response response = new Response(request, answers, _nodeId, 
attache.getId());
 +            if (s_logger.isDebugEnabled()) {
 +                if (logD) {
 +                    s_logger.debug("SeqA " + attache.getId() + "-" + 
response.getSequence() + ": Sending " + response);
 +                } else {
 +                    s_logger.trace("SeqA " + attache.getId() + "-" + 
response.getSequence() + ": Sending " + response);
 +                }
 +            }
 +            try {
 +                link.send(response.toBytes());
 +            } catch (final ClosedChannelException e) {
 +                s_logger.warn("Unable to send response because connection is 
closed: " + response);
 +            }
 +        }
 +
 +        protected void processResponse(final Link link, final Response 
response) {
 +            final AgentAttache attache = (AgentAttache)link.attachment();
 +            if (attache == null) {
 +                s_logger.warn("Unable to process: " + response);
 +            } else if (!attache.processAnswers(response.getSequence(), 
response)) {
 +                s_logger.info("Host " + attache.getId() + " - Seq " + 
response.getSequence() + ": Response is not processed: " + response);
 +            }
 +        }
 +
 +        @Override
 +        protected void doTask(final Task task) throws TaskExecutionException {
 +            final TransactionLegacy txn = 
TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
 +            try {
 +                final Type type = task.getType();
 +                if (type == Task.Type.DATA) {
 +                    final byte[] data = task.getData();
 +                    try {
 +                        final Request event = Request.parse(data);
 +                        if (event instanceof Response) {
 +                            processResponse(task.getLink(), (Response)event);
 +                        } else {
 +                            processRequest(task.getLink(), event);
 +                        }
 +                    } catch (final UnsupportedVersionException e) {
 +                        s_logger.warn(e.getMessage());
 +                        // upgradeAgent(task.getLink(), data, e.getReason());
 +                    } catch (final ClassNotFoundException e) {
 +                        final String message = String.format("Exception 
occured when executing taks! Error '%s'", e.getMessage());
 +                        s_logger.error(message);
 +                        throw new TaskExecutionException(message, e);
 +                    }
 +                } else if (type == Task.Type.CONNECT) {
 +                } else if (type == Task.Type.DISCONNECT) {
 +                    final Link link = task.getLink();
 +                    final AgentAttache attache = 
(AgentAttache)link.attachment();
 +                    if (attache != null) {
 +                        disconnectWithInvestigation(attache, 
Event.AgentDisconnected);
 +                    } else {
 +                        s_logger.info("Connection from " + 
link.getIpAddress() + " closed but no cleanup was done.");
 +                        link.close();
 +                        link.terminated();
 +                    }
 +                }
 +            } finally {
 +                txn.close();
 +            }
 +        }
 +    }
 +
 +    protected AgentManagerImpl() {
 +    }
 +
 +    public boolean tapLoadingAgents(final Long hostId, final TapAgentsAction 
action) {
 +        synchronized (_loadingAgents) {
 +            if (action == TapAgentsAction.Add) {
 +                if (_loadingAgents.contains(hostId)) {
 +                    return false;
 +                } else {
 +                    _loadingAgents.add(hostId);
 +                }
 +            } else if (action == TapAgentsAction.Del) {
 +                _loadingAgents.remove(hostId);
 +            } else if (action == TapAgentsAction.Contains) {
 +                return _loadingAgents.contains(hostId);
 +            } else {
 +                throw new CloudRuntimeException("Unkonwn TapAgentsAction " + 
action);
 +            }
 +        }
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean agentStatusTransitTo(final HostVO host, final Status.Event 
e, final long msId) {
 +        try {
 +            _agentStatusLock.lock();
 +            if (s_logger.isDebugEnabled()) {
 +                final ResourceState state = host.getResourceState();
 +                final StringBuilder msg = new StringBuilder("Transition:");
 +                msg.append("[Resource state = ").append(state);
 +                msg.append(", Agent event = ").append(e.toString());
 +                msg.append(", Host id = ").append(host.getId()).append(", 
name = " + host.getName()).append("]");
 +                s_logger.debug(msg);
 +            }
 +
 +            host.setManagementServerId(msId);
 +            try {
 +                return _statusStateMachine.transitTo(host, e, host.getId(), 
_hostDao);
 +            } catch (final NoTransitionException e1) {
 +                s_logger.debug("Cannot transit agent status with event " + e 
+ " for host " + host.getId() + ", name=" + host.getName() + ", mangement 
server id is " + msId);
 +                throw new CloudRuntimeException("Cannot transit agent status 
with event " + e + " for host " + host.getId() + ", mangement server id is " + 
msId + "," + e1.getMessage());
 +            }
 +        } finally {
 +            _agentStatusLock.unlock();
 +        }
 +    }
 +
 +    public boolean disconnectAgent(final HostVO host, final Status.Event e, 
final long msId) {
 +        host.setDisconnectedOn(new Date());
 +        if (e.equals(Status.Event.Remove)) {
 +            host.setGuid(null);
 +            host.setClusterId(null);
 +        }
 +
 +        return agentStatusTransitTo(host, e, msId);
 +    }
 +
 +    protected void disconnectWithoutInvestigation(final AgentAttache attache, 
final Status.Event event) {
 +        _executor.submit(new DisconnectTask(attache, event, false));
 +    }
 +
 +    public void disconnectWithInvestigation(final AgentAttache attache, final 
Status.Event event) {
 +        _executor.submit(new DisconnectTask(attache, event, true));
 +    }
 +
 +    protected boolean isHostOwnerSwitched(final long hostId) {
 +        final HostVO host = _hostDao.findById(hostId);
 +        if (host == null) {
 +            s_logger.warn("Can't find the host " + hostId);
 +            return false;
 +        }
 +        return isHostOwnerSwitched(host);
 +    }
 +
 +    protected boolean isHostOwnerSwitched(final HostVO host) {
 +        if (host.getStatus() == Status.Up && host.getManagementServerId() != 
null && host.getManagementServerId() != _nodeId) {
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    private void disconnectInternal(final long hostId, final Status.Event 
event, final boolean invstigate) {
 +        final AgentAttache attache = findAttache(hostId);
 +
 +        if (attache != null) {
 +            if (!invstigate) {
 +                disconnectWithoutInvestigation(attache, event);
 +            } else {
 +                disconnectWithInvestigation(attache, event);
 +            }
 +        } else {
 +            /* Agent is still in connecting process, don't allow to 
disconnect right away */
 +            if (tapLoadingAgents(hostId, TapAgentsAction.Contains)) {
 +                s_logger.info("Host " + hostId + " is being loaded so no 
disconnects needed.");
 +                return;
 +            }
 +
 +            final HostVO host = _hostDao.findById(hostId);
 +            if (host != null && host.getRemoved() == null) {
 +                disconnectAgent(host, event, _nodeId);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void disconnectWithInvestigation(final long hostId, final 
Status.Event event) {
 +        disconnectInternal(hostId, event, true);
 +    }
 +
 +    @Override
 +    public void disconnectWithoutInvestigation(final long hostId, final 
Status.Event event) {
 +        disconnectInternal(hostId, event, false);
 +    }
 +
 +    @Override
 +    public boolean handleDirectConnectAgent(final Host host, final 
StartupCommand[] cmds, final ServerResource resource, final boolean 
forRebalance, boolean newHost) throws ConnectionException {
 +        AgentAttache attache;
 +
 +        attache = createAttacheForDirectConnect(host, resource);
 +        final StartupAnswer[] answers = new StartupAnswer[cmds.length];
 +        for (int i = 0; i < answers.length; i++) {
 +            answers[i] = new StartupAnswer(cmds[i], attache.getId(), 
mgmtServiceConf.getPingInterval());
 +        }
 +        attache.process(answers);
 +
 +        if (newHost) {
 +            notifyMonitorsOfNewlyAddedHost(host.getId());
 +        }
 +
 +        attache = notifyMonitorsOfConnection(attache, cmds, forRebalance);
 +
 +        return attache != null;
 +    }
 +
 +    @Override
 +    public void pullAgentToMaintenance(final long hostId) {
 +        final AgentAttache attache = findAttache(hostId);
 +        if (attache != null) {
 +            attache.setMaintenanceMode(true);
 +            // Now cancel all of the commands except for the active one.
 +            attache.cancelAllCommands(Status.Disconnected, false);
 +        }
 +    }
 +
 +    @Override
 +    public void pullAgentOutMaintenance(final long hostId) {
 +        final AgentAttache attache = findAttache(hostId);
 +        if (attache != null) {
 +            attache.setMaintenanceMode(false);
 +        }
 +    }
 +
 +    public ScheduledExecutorService getDirectAgentPool() {
 +        return _directAgentExecutor;
 +    }
 +
 +    public ScheduledExecutorService getCronJobPool() {
 +        return _cronJobExecutor;
 +    }
 +
 +    public int getDirectAgentThreadCap() {
 +        return _directAgentThreadCap;
 +    }
 +
 +    public Long getAgentPingTime(final long agentId) {
 +        return _pingMap.get(agentId);
 +    }
 +
 +    public void pingBy(final long agentId) {
 +        // Update PingMap with the latest time if agent entry exists in the 
PingMap
 +        if (_pingMap.replace(agentId, InaccurateClock.getTimeInSeconds()) == 
null) {
 +            s_logger.info("PingMap for agent: " + agentId + " will not be 
updated because agent is no longer in the PingMap");
 +        }
 +    }
 +
 +    protected class MonitorTask extends ManagedContextRunnable {
 +        @Override
 +        protected void runInContext() {
 +            s_logger.trace("Agent Monitor is started.");
 +
 +            try {
 +                final List<Long> behindAgents = findAgentsBehindOnPing();
 +                for (final Long agentId : behindAgents) {
 +                    final QueryBuilder<HostVO> sc = 
QueryBuilder.create(HostVO.class);
 +                    sc.and(sc.entity().getId(), Op.EQ, agentId);
 +                    final HostVO h = sc.find();
 +                    if (h != null) {
 +                        final ResourceState resourceState = 
h.getResourceState();
 +                        if (resourceState == ResourceState.Disabled || 
resourceState == ResourceState.Maintenance || resourceState == 
ResourceState.ErrorInMaintenance) {
 +                            /*
 +                             * Host is in non-operation state, so no 
investigation and direct put agent to Disconnected
 +                             */
 +                            s_logger.debug("Ping timeout but agent " + 
agentId + " is in resource state of " + resourceState + ", so no 
investigation");
 +                            disconnectWithoutInvestigation(agentId, 
Event.ShutdownRequested);
 +                        } else {
 +                            final HostVO host = _hostDao.findById(agentId);
 +                            if (host != null
 +                                    && (host.getType() == 
Host.Type.ConsoleProxy || host.getType() == Host.Type.SecondaryStorageVM || 
host.getType() == Host.Type.SecondaryStorageCmdExecutor)) {
 +
 +                                s_logger.warn("Disconnect agent for CPVM/SSVM 
due to physical connection close. host: " + host.getId());
 +                                disconnectWithoutInvestigation(agentId, 
Event.ShutdownRequested);
 +                            } else {
 +                                s_logger.debug("Ping timeout for agent " + 
agentId + ", do invstigation");
 +                                disconnectWithInvestigation(agentId, 
Event.PingTimeout);
 +                            }
 +                        }
 +                    }
 +                }
 +
 +                final QueryBuilder<HostVO> sc = 
QueryBuilder.create(HostVO.class);
 +                sc.and(sc.entity().getResourceState(), Op.IN, 
ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
 +                final List<HostVO> hosts = sc.list();
 +
 +                for (final HostVO host : hosts) {
 +                    if (_resourceMgr.checkAndMaintain(host.getId())) {
 +                        final DataCenterVO dcVO = 
_dcDao.findById(host.getDataCenterId());
 +                        final HostPodVO podVO = 
_podDao.findById(host.getPodId());
 +                        final String hostDesc = "name: " + host.getName() + " 
(id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + 
podVO.getName();
 +                        
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, 
host.getDataCenterId(), host.getPodId(), "Migration Complete for host " + 
hostDesc,
 +                                "Host [" + hostDesc + "] is ready for 
maintenance");
 +                    }
 +                }
 +            } catch (final Throwable th) {
 +                s_logger.error("Caught the following exception: ", th);
 +            }
 +
 +            s_logger.trace("Agent Monitor is leaving the building!");
 +        }
 +
 +        protected List<Long> findAgentsBehindOnPing() {
 +            final List<Long> agentsBehind = new ArrayList<Long>();
 +            final long cutoffTime = InaccurateClock.getTimeInSeconds() - 
mgmtServiceConf.getTimeout();
 +            for (final Map.Entry<Long, Long> entry : _pingMap.entrySet()) {
 +                if (entry.getValue() < cutoffTime) {
 +                    agentsBehind.add(entry.getKey());
 +                }
 +            }
 +
 +            if (agentsBehind.size() > 0) {
 +                s_logger.info("Found the following agents behind on ping: " + 
agentsBehind);
 +            }
 +
 +            return agentsBehind;
 +        }
 +    }
 +
 +    protected class BehindOnPingListener implements Listener {
 +        @Override
 +        public boolean isRecurring() {
 +            return true;
 +        }
 +
 +        @Override
 +        public boolean processAnswers(final long agentId, final long seq, 
final Answer[] answers) {
 +            return false;
 +        }
 +
 +        @Override
 +        public boolean processCommands(final long agentId, final long seq, 
final Command[] commands) {
 +            final boolean processed = false;
 +            for (final Command cmd : commands) {
 +                if (cmd instanceof PingCommand) {
 +                    pingBy(agentId);
 +                }
 +            }
 +            return processed;
 +        }
 +
 +        @Override
 +        public AgentControlAnswer processControlCommand(final long agentId, 
final AgentControlCommand cmd) {
 +            return null;
 +        }
 +
 +        @Override
 +        public void processHostAdded(long hostId) {
 +        }
 +
 +        @Override
 +        public void processConnect(final Host host, final StartupCommand cmd, 
final boolean forRebalance) {
 +            if (host.getType().equals(Host.Type.TrafficMonitor) || 
host.getType().equals(Host.Type.SecondaryStorage)) {
 +                return;
 +            }
 +
 +            // NOTE: We don't use pingBy here because we're initiating.
 +            _pingMap.put(host.getId(), InaccurateClock.getTimeInSeconds());
 +        }
 +
 +        @Override
 +        public boolean processDisconnect(final long agentId, final Status 
state) {
 +            _pingMap.remove(agentId);
 +            return true;
 +        }
 +
 +        @Override
 +        public void processHostAboutToBeRemoved(long hostId) {
 +        }
 +
 +        @Override
 +        public void processHostRemoved(long hostId, long clusterId) {
 +        }
 +
 +        @Override
 +        public boolean processTimeout(final long agentId, final long seq) {
 +            return true;
 +        }
 +
 +        @Override
 +        public int getTimeout() {
 +            return -1;
 +        }
 +
 +    }
 +
 +    @Override
 +    public String getConfigComponentName() {
 +        return AgentManager.class.getSimpleName();
 +    }
 +
 +    @Override
 +    public ConfigKey<?>[] getConfigKeys() {
 +        return new ConfigKey<?>[] { CheckTxnBeforeSending, Workers, Port, 
Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize,
 +            DirectAgentThreadCap };
 +    }
 +
 +    protected class SetHostParamsListener implements Listener {
 +        @Override
 +        public boolean isRecurring() {
 +            return false;
 +        }
 +
 +        @Override
 +        public boolean processAnswers(final long agentId, final long seq, 
final Answer[] answers) {
 +            return false;
 +        }
 +
 +        @Override
 +        public boolean processCommands(final long agentId, final long seq, 
final Command[] commands) {
 +            return false;
 +        }
 +
 +        @Override
 +        public AgentControlAnswer processControlCommand(final long agentId, 
final AgentControlCommand cmd) {
 +            return null;
 +        }
 +
 +        @Override
 +        public void processHostAdded(long hostId) {
 +        }
 +
 +        @Override
 +        public void processConnect(final Host host, final StartupCommand cmd, 
final boolean forRebalance) {
 +            if (cmd instanceof StartupRoutingCommand) {
 +                if (((StartupRoutingCommand)cmd).getHypervisorType() == 
HypervisorType.KVM || ((StartupRoutingCommand)cmd).getHypervisorType() == 
HypervisorType.LXC) {
 +                    Map<String, String> params = new HashMap<String, 
String>();
 +                    params.put("router.aggregation.command.each.timeout", 
_configDao.getValue("router.aggregation.command.each.timeout"));
 +
 +                    try {
 +                        SetHostParamsCommand cmds = new 
SetHostParamsCommand(params);
 +                        Commands c = new Commands(cmds);
 +                        send(host.getId(), c, this);
 +                    } catch (AgentUnavailableException e) {
 +                        s_logger.debug("Failed to send host params on host: " 
+ host.getId());
 +                    }
 +                }
 +            }
 +
 +        }
 +
 +        @Override
 +        public boolean processDisconnect(final long agentId, final Status 
state) {
 +            return true;
 +        }
 +
 +        @Override
 +        public void processHostAboutToBeRemoved(long hostId) {
 +        }
 +
 +        @Override
 +        public void processHostRemoved(long hostId, long clusterId) {
 +        }
 +
 +        @Override
 +        public boolean processTimeout(final long agentId, final long seq) {
 +            return false;
 +        }
 +
 +        @Override
 +        public int getTimeout() {
 +            return -1;
 +        }
 +
 +    }
 +
 +}
diff --cc 
plugins/hypervisors/simulator/src/main/java/com/cloud/resource/SimulatorStorageProcessor.java
index 30cad8f,c2dfdbd..c2dfdbd
mode 100755,100644..100644
--- 
a/plugins/hypervisors/simulator/src/main/java/com/cloud/resource/SimulatorStorageProcessor.java
+++ 
b/plugins/hypervisors/simulator/src/main/java/com/cloud/resource/SimulatorStorageProcessor.java

-- 
To stop receiving notification emails like this one, please contact
ro...@apache.org.

Reply via email to