AMBARI-7985. Add server side command functionality. Allow tasks to be executed on the Ambari Server host.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3d397dc0 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3d397dc0 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3d397dc0 Branch: refs/heads/trunk Commit: 3d397dc04aab8d81c00aae2a8e5afa099fc57567 Parents: cc076cf Author: Robert Levas <rle...@hortonworks.com> Authored: Mon Nov 17 12:22:15 2014 -0500 Committer: John Speidel <jspei...@hortonworks.com> Committed: Mon Nov 17 12:25:43 2014 -0500 ---------------------------------------------------------------------- .../server/actionmanager/ActionDBAccessor.java | 5 + .../actionmanager/ActionDBAccessorImpl.java | 7 +- .../server/actionmanager/ActionManager.java | 5 +- .../server/actionmanager/ActionScheduler.java | 97 +--- .../ambari/server/actionmanager/Stage.java | 39 +- .../server/controller/ControllerModule.java | 3 - .../server/orm/dao/HostRoleCommandDAO.java | 11 + .../serveraction/AbstractServerAction.java | 138 +++++ .../server/serveraction/ServerAction.java | 72 ++- .../serveraction/ServerActionExecutor.java | 525 +++++++++++++++++++ .../serveraction/ServerActionManager.java | 32 -- .../serveraction/ServerActionManagerImpl.java | 74 --- .../server/state/ServiceComponentHostEvent.java | 2 + .../state/ServiceComponentHostEventType.java | 6 +- .../ServiceComponentHostServerActionEvent.java | 76 +++ .../ServiceComponentHostUpgradeEvent.java | 3 +- .../actionmanager/TestActionDBAccessorImpl.java | 80 ++- .../server/actionmanager/TestActionManager.java | 6 +- .../actionmanager/TestActionScheduler.java | 395 +++++++++----- .../server/agent/TestHeartbeatHandler.java | 6 +- .../AmbariManagementControllerTest.java | 25 +- .../server/serveraction/MockServerAction.java | 92 ++++ .../serveraction/ServerActionExecutorTest.java | 248 +++++++++ 23 files changed, 1581 insertions(+), 366 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java index 1f99b4a..6ff365b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java @@ -149,6 +149,11 @@ public interface ActionDBAccessor { public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds); /** + * Get a List of host role commands where the role and status are as specified + */ + public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname, String role, HostRoleStatus status); + + /** * Get all stages that contain tasks with specified host role statuses */ public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses); http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index 5e879cc..e8be3cc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -212,7 +212,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { if (clusterEntity != null) { clusterId = clusterEntity.getClusterId(); } - + requestEntity.setClusterId(clusterId); requestDAO.create(requestEntity); @@ -550,6 +550,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } @Override + public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname, String role, HostRoleStatus status) { + return getTasks(hostRoleCommandDAO.findTaskIdsByHostRoleAndStatus(hostname, role, status)); + } + + @Override public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) { List<Stage> stages = new ArrayList<Stage>(); for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java index e2fad5f..6d1d87f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java @@ -28,7 +28,6 @@ import org.apache.ambari.server.api.services.BaseRequest; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.ExecuteActionRequest; import org.apache.ambari.server.controller.HostsMap; -import org.apache.ambari.server.serveraction.ServerActionManager; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.utils.StageUtils; import org.slf4j.Logger; @@ -60,12 +59,12 @@ public class ActionManager { public ActionManager(@Named("schedulerSleeptime") long schedulerSleepTime, @Named("actionTimeout") long actionTimeout, ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap, - ServerActionManager serverActionManager, UnitOfWork unitOfWork, + UnitOfWork unitOfWork, RequestFactory requestFactory, Configuration configuration) { this.actionQueue = aq; this.db = db; scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db, - actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork, configuration); + actionQueue, fsm, 2, hostsMap, unitOfWork, configuration); requestCounter = new AtomicLong( db.getLastPersistedRequestIdWhenInitialized()); this.requestFactory = requestFactory; http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 81fee75..c23440e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -43,8 +43,7 @@ import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.HostsMap; -import org.apache.ambari.server.serveraction.ServerAction; -import org.apache.ambari.server.serveraction.ServerActionManager; +import org.apache.ambari.server.serveraction.ServerActionExecutor; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Host; @@ -93,7 +92,7 @@ class ActionScheduler implements Runnable { private boolean taskTimeoutAdjustment = true; private final HostsMap hostsMap; private final Object wakeupSyncObject = new Object(); - private final ServerActionManager serverActionManager; + private final ServerActionExecutor serverActionExecutor; private final Configuration configuration; private final Set<Long> requestsInProgress = new HashSet<Long>(); @@ -126,7 +125,7 @@ class ActionScheduler implements Runnable { public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject, - int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager, + int maxAttempts, HostsMap hostsMap, UnitOfWork unitOfWork, Configuration configuration) { this.sleepTime = sleepTimeMilliSec; this.hostsMap = hostsMap; @@ -135,7 +134,7 @@ class ActionScheduler implements Runnable { this.actionQueue = actionQueue; this.fsmObject = fsmObject; this.maxAttempts = (short) maxAttempts; - this.serverActionManager = serverActionManager; + this.serverActionExecutor = new ServerActionExecutor(db, sleepTimeMilliSec); this.unitOfWork = unitOfWork; this.clusterHostInfoCache = CacheBuilder.newBuilder(). expireAfterAccess(5, TimeUnit.MINUTES). @@ -152,11 +151,19 @@ class ActionScheduler implements Runnable { public void start() { schedulerThread = new Thread(this); schedulerThread.start(); + + // Start up the ServerActionExecutor. Since it is directly related to the ActionScheduler it + // should be started and stopped along with it. + serverActionExecutor.start(); } public void stop() { shouldRun = false; schedulerThread.interrupt(); + + // Stop the ServerActionExecutor. Since it is directly related to the ActionScheduler it should + // be started and stopped along with it. + serverActionExecutor.stop(); } /** @@ -216,7 +223,7 @@ class ActionScheduler implements Runnable { return; } int i_stage = 0; - + stages = filterParallelPerHostStages(stages); boolean exclusiveRequestIsGoing = false; @@ -285,18 +292,7 @@ class ActionScheduler implements Runnable { //Schedule what we have so far for (ExecutionCommand cmd : commandsToSchedule) { - if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) { - /** - * We don't forbid executing any stages in parallel with - * AMBARI_SERVER_ACTION. That should be OK as AMBARI_SERVER_ACTION - * is not used as of now. The general motivation has been to update - * Request status when last task associated with the - * Request is finished. - */ - executeServerAction(s, cmd); - } else { processHostRole(s, cmd, commandsToStart, commandsToUpdate); - } } LOG.debug("==> Commands to start: {}", commandsToStart.size()); @@ -347,7 +343,12 @@ class ActionScheduler implements Runnable { LOG.debug("==> Adding {} tasks to queue...", commandsToUpdate.size()); for (ExecutionCommand cmd : commandsToUpdate) { - actionQueue.enqueue(cmd.getHostname(), cmd); + // Do not queue up server actions; however if we encounter one, wake up the ServerActionExecutor + if (Role.AMBARI_SERVER_ACTION.toString().equals(cmd.getRole())) { + serverActionExecutor.awake(); + } else { + actionQueue.enqueue(cmd.getHostname(), cmd); + } } LOG.debug("==> Finished."); @@ -403,32 +404,6 @@ class ActionScheduler implements Runnable { return true; } - /** - * Executes internal ambari-server action - */ - private void executeServerAction(Stage s, ExecutionCommand cmd) { - try { - LOG.trace("Executing server action: request_id={}, stage_id={}, task_id={}", - s.getRequestId(), s.getStageId(), cmd.getTaskId()); - long now = System.currentTimeMillis(); - String hostName = cmd.getHostname(); - String roleName = cmd.getRole(); - - s.setStartTime(hostName, roleName, now); - s.setLastAttemptTime(hostName, roleName, now); - s.incrementAttemptCount(hostName, roleName); - s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED); - db.hostRoleScheduled(s, hostName, roleName); - String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME); - this.serverActionManager.executeAction(actionName, cmd.getCommandParams()); - reportServerActionSuccess(s, cmd); - - } catch (AmbariException e) { - LOG.warn("Could not execute server action " + cmd.toString(), e); - reportServerActionFailure(s, cmd, e.getMessage()); - } - } - private boolean hasPreviousStageFailed(Stage stage) { boolean failed = false; long prevStageId = stage.getStageId() - 1; @@ -477,26 +452,6 @@ class ActionScheduler implements Runnable { return failed; } - private void reportServerActionSuccess(Stage stage, ExecutionCommand cmd) { - CommandReport report = new CommandReport(); - report.setStatus(HostRoleStatus.COMPLETED.toString()); - report.setExitCode(0); - report.setStdOut("Server action succeeded"); - report.setStdErr(""); - db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(), - cmd.getRole(), report); - } - - private void reportServerActionFailure(Stage stage, ExecutionCommand cmd, String message) { - CommandReport report = new CommandReport(); - report.setStatus(HostRoleStatus.FAILED.toString()); - report.setExitCode(1); - report.setStdOut("Server action failed"); - report.setStdErr(message); - db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(), - cmd.getRole(), report); - } - /** * @return Stats for the roles in the stage. It is used to determine whether stage * has succeeded or failed. @@ -569,12 +524,12 @@ class ActionScheduler implements Runnable { // Check that service host component is not deleted if (hostDeleted) { - + String message = String.format( "Host not found when trying to schedule an execution command. " + "The most probable reason for that is that host or host component " + "has been deleted recently. The command has been aborted and dequeued." + - "Execution command details: " + + "Execution command details: " + "cmdId: %s; taskId: %s; roleCommand: %s", c.getCommandId(), c.getTaskId(), c.getRoleCommand()); LOG.warn("Host {} has been detected as non-available. {}", host, message); @@ -772,7 +727,7 @@ class ActionScheduler implements Runnable { } cmd.setClusterHostInfo(clusterHostInfo); - + //Try to get commandParams from cache and merge them with command-level parameters Map<String, String> commandParams = commandParamsStageCache.getIfPresent(stagePk); @@ -888,12 +843,16 @@ class ActionScheduler implements Runnable { LOG.error("Unknown status " + status.name()); } } - - + + public void setTaskTimeoutAdjustment(boolean val) { this.taskTimeoutAdjustment = val; } + ServerActionExecutor getServerActionExecutor() { + return serverActionExecutor; + } + static class RoleStats { int numInProgress; int numQueued = 0; http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java index bbc5ac3..01f2085 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.actionmanager; +import java.text.NumberFormat; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -39,7 +40,7 @@ import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.serveraction.ServerAction; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ServiceComponentHostEvent; -import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent; import org.apache.ambari.server.utils.StageUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -302,14 +303,38 @@ public class Stage { /** - * Creates server-side execution command. As of now, it seems to - * be used only for server upgrade + * Creates server-side execution command. + * <p/> + * The action name for this command is expected to be the classname of a + * {@link org.apache.ambari.server.serveraction.ServerAction} implementation which will be + * instantiated and invoked as needed. + * + * @param actionName a String declaring the action name (in the form of a classname) to execute + * @param role the Role for this command + * @param command the RoleCommand for this command + * @param clusterName a String identifying the cluster on which to to execute this command + * @param event a ServiceComponentHostServerActionEvent + * @param commandParams a Map of String to String data used to pass to the action - this may be + * empty or null if no data is relevant + * @param timeout an Integer declaring the timeout for this action - if null, a default + * timeout will be used */ - public synchronized void addServerActionCommand(String actionName, Role role, RoleCommand command, String clusterName, - ServiceComponentHostUpgradeEvent event, String hostName) { - ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, role, command, event); + public synchronized void addServerActionCommand(String actionName, Role role, RoleCommand command, + String clusterName, ServiceComponentHostServerActionEvent event, + @Nullable Map<String, String> commandParams, + @Nullable Integer timeout) { + ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, StageUtils.getHostName(), role, command, event); ExecutionCommand cmd = commandWrapper.getExecutionCommand(); - + + Map<String, String> cmdParams = new HashMap<String, String>(); + if (commandParams != null) { + cmdParams.putAll(commandParams); + } + if (timeout != null) { + cmdParams.put(ExecutionCommand.KeyNames.COMMAND_TIMEOUT, NumberFormat.getIntegerInstance().format(timeout)); + } + cmd.setCommandParams(cmdParams); + Map<String, String> roleParams = new HashMap<String, String>(); roleParams.put(ServerAction.ACTION_NAME, actionName); cmd.setRoleParams(roleParams); http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java index d74510a..2d91462 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java @@ -66,8 +66,6 @@ import org.apache.ambari.server.scheduler.ExecutionScheduler; import org.apache.ambari.server.scheduler.ExecutionSchedulerImpl; import org.apache.ambari.server.security.SecurityHelper; import org.apache.ambari.server.security.SecurityHelperImpl; -import org.apache.ambari.server.serveraction.ServerActionManager; -import org.apache.ambari.server.serveraction.ServerActionManagerImpl; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; @@ -238,7 +236,6 @@ public class ControllerModule extends AbstractModule { bind(AmbariManagementController.class) .to(AmbariManagementControllerImpl.class); bind(AbstractRootServiceResponseFactory.class).to(RootServiceResponseFactory.class); - bind(ServerActionManager.class).to(ServerActionManagerImpl.class); bind(ExecutionScheduler.class).to(ExecutionSchedulerImpl.class); bind(DBAccessor.class).to(DBAccessorImpl.class); bind(ViewInstanceHandlerList.class).to(AmbariHandlerList.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java index 6920a9e..cf025b7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java @@ -127,6 +127,17 @@ public class HostRoleCommandDAO { } @RequiresSession + public List<Long> findTaskIdsByHostRoleAndStatus(String hostname, String role, HostRoleStatus status) { + TypedQuery<Long> query = entityManagerProvider.get().createQuery( + "SELECT DISTINCT task.taskId FROM HostRoleCommandEntity task " + + "WHERE task.hostName=?1 AND task.role=?2 AND task.status=?3 " + + "ORDER BY task.taskId", Long.class + ); + + return daoUtils.selectList(query, hostname, role, status); + } + + @RequiresSession public List<HostRoleCommandEntity> findSortedCommandsByStageAndHost(StageEntity stageEntity, HostEntity hostEntity) { TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand " + "FROM HostRoleCommandEntity hostRoleCommand " + http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java new file mode 100644 index 0000000..9882e73 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.serveraction; + +import org.apache.ambari.server.RoleCommand; +import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.agent.ExecutionCommand; +import org.apache.ambari.server.utils.StageUtils; + +import java.util.Collections; +import java.util.Map; + +/** + * AbstractServerActionImpl is an abstract implementation of a ServerAction. + * <p/> + * This abstract implementation provides common facilities for all ServerActions, such as + * maintaining the ExecutionCommand and HostRoleCommand properties. It also provides a convenient + * way to generate CommandReports for reporting status. + */ +public abstract class AbstractServerAction implements ServerAction { + /** + * The ExecutionCommand containing data related to this ServerAction implementation + */ + private ExecutionCommand executionCommand = null; + + /** + * The HostRoleCommand containing data related to this ServerAction implementation + */ + private HostRoleCommand hostRoleCommand = null; + + @Override + public ExecutionCommand getExecutionCommand() { + return this.executionCommand; + } + + @Override + public void setExecutionCommand(ExecutionCommand executionCommand) { + this.executionCommand = executionCommand; + } + + @Override + public HostRoleCommand getHostRoleCommand() { + return this.hostRoleCommand; + } + + @Override + public void setHostRoleCommand(HostRoleCommand hostRoleCommand) { + this.hostRoleCommand = hostRoleCommand; + } + + /** + * Creates a CommandReport used to report back to Ambari the status of this ServerAction. + * + * @param exitCode an integer value declaring the exit code for this action - 0 typically + * indicates success. + * @param status a HostRoleStatus indicating the status of this action + * @param structuredOut a String containing the (typically) JSON-formatted data representing the + * output from this action (this data is stored in the database, along with + * the command status) + * @param stdout A string containing the data from the standard out stream (this data is stored in + * the database, along with the command status) + * @param stderr A string containing the data from the standard error stream (this data is stored + * in the database, along with the command status) + * @return the generated CommandReport, or null if the HostRoleCommand or ExecutionCommand + * properties are missing + */ + protected CommandReport createCommandReport(int exitCode, HostRoleStatus status, String structuredOut, + String stdout, String stderr) { + CommandReport report = null; + + if (hostRoleCommand != null) { + if (executionCommand == null) { + ExecutionCommandWrapper wrapper = hostRoleCommand.getExecutionCommandWrapper(); + + if (wrapper != null) { + executionCommand = wrapper.getExecutionCommand(); + } + } + + if (executionCommand != null) { + RoleCommand roleCommand = executionCommand.getRoleCommand(); + + report = new CommandReport(); + + report.setActionId(StageUtils.getActionId(hostRoleCommand.getRequestId(), hostRoleCommand.getStageId())); + report.setClusterName(executionCommand.getClusterName()); + report.setConfigurationTags(executionCommand.getConfigurationTags()); + report.setRole(executionCommand.getRole()); + report.setRoleCommand((roleCommand == null) ? null : roleCommand.toString()); + report.setServiceName(executionCommand.getServiceName()); + report.setTaskId(executionCommand.getTaskId()); + + report.setStructuredOut(structuredOut); + report.setStdErr((stderr == null) ? "" : stderr); + report.setStdOut((stdout == null) ? "" : stdout); + report.setStatus((status == null) ? null : status.toString()); + report.setExitCode(exitCode); + } + } + + return report; + } + + /** + * Returns the command parameters value from the ExecutionCommand + * <p/> + * The returned map should be assumed to be read-only. + * + * @return the (assumed read-only) command parameters value from the ExecutionCommand + */ + protected Map<String, String> getCommandParameters() { + if (executionCommand == null) { + return Collections.emptyMap(); + } else { + return executionCommand.getCommandParams(); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java index be885b5..99e3029 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java @@ -18,23 +18,67 @@ package org.apache.ambari.server.serveraction; -public class ServerAction { +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.agent.ExecutionCommand; + +import java.util.concurrent.ConcurrentMap; + +/** + * ServerAction is an interface to be implemented by all server-based actions/tasks. + */ +public interface ServerAction { public static final String ACTION_NAME = "ACTION_NAME"; + + /** + * Gets the ExecutionCommand property of this ServerAction. + * + * @return the ExecutionCommand property of this ServerAction + */ + ExecutionCommand getExecutionCommand(); + + /** + * Sets the ExecutionCommand property of this ServerAction. + * <p/> + * This property is expected to be set by the creator of this ServerAction before calling execute. + * + * @param command the ExecutionCommand data to set + */ + void setExecutionCommand(ExecutionCommand command); + + + /** + * Gets the HostRoleCommand property of this ServerAction. + * + * @return the HostRoleCommand property of this ServerAction + */ + HostRoleCommand getHostRoleCommand(); + + /** + * Sets the HostRoleCommand property of this ServerAction. + * <p/> + * This property is expected to be set by the creator of this ServerAction before calling execute. + * + * @param hostRoleCommand the HostRoleCommand data to set + */ + void setHostRoleCommand(HostRoleCommand hostRoleCommand); + /** - * The commands supported by the server. A command is a named alias to the - * action implementation at the server + * Executes this ServerAction + * <p/> + * This is typically called by the ServerActionExecutor in it's own thread, but there is no + * guarantee that this is the case. It is expected that the ExecutionCommand and HostRoleCommand + * properties are set before calling this method. + * + * @param requestSharedDataContext a Map to be used a shared data among all ServerActions related + * to a given request + * @return a CommandReport declaring the status of the task + * @throws AmbariException + * @throws InterruptedException */ - public static class Command { - /** - * Finalize the upgrade request - */ - public static final String FINALIZE_UPGRADE = "FINALIZE_UPGRADE"; - } - - public static class PayloadName { - public final static String CURRENT_STACK_VERSION = "current_stack_version"; - public final static String CLUSTER_NAME = "cluster_name"; - } + CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext) + throws AmbariException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java new file mode 100644 index 0000000..880c596 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.serveraction; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.Role; +import org.apache.ambari.server.actionmanager.*; +import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.agent.ExecutionCommand; +import org.apache.ambari.server.utils.StageUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Server Action Executor used to execute server-side actions (or tasks) + * <p/> + * The ServerActionExecutor executes in its own thread, polling for AMBARI_SERVER_ACTION + * HostRoleCommands queued for execution. It is expected that this thread is managed by the + * ActionScheduler such that it is started when the ActionScheduler is started and stopped when the + * ActionScheduler is stopped. + */ +public class ServerActionExecutor { + + private final static Logger LOG = LoggerFactory.getLogger(ServerActionExecutor.class); + private final static Long EXECUTION_TIMEOUT_MS = 1000L * 60 * 5; + private final static Long POLLING_TIMEOUT_MS = 1000L * 5; + + /** + * Maps request IDs to "blackboards" of shared data. + * <p/> + * This map is not synchronized, so any access to it should synchronize on + * requestSharedDataMap object + */ + private final Map<Long, ConcurrentMap<String, Object>> requestSharedDataMap = + new HashMap<Long, ConcurrentMap<String, Object>>(); + + /** + * The hostname of the (Ambari) server. + * <p/> + * This hostname is cached so that cycles are spent querying for it more than once. + */ + private final String serverHostName; + + /** + * Database accessor to query and update the database of action commands. + */ + private final ActionDBAccessor db; + + /** + * Internal locking object used to manage access to activeAwakeRequest. + */ + private final Object wakeupSyncObject = new Object(); + + /** + * Timeout (in milliseconds) used to throttle polling of database for new action commands. + */ + private final long sleepTimeMS; + + /** + * Flag used to help keep thing moving in the event an "awake" request was encountered while busy + * handing an action. + */ + private boolean activeAwakeRequest = false; + + /** + * A reference to the Thread handling the work for this ServerActionExecutor + */ + private Thread executorThread = null; + + /** + * Creates a new ServerActionExecutor + * + * @param db the ActionDBAccessor to use to read and update tasks + * @param sleepTimeMS the time (in milliseconds) to wait between polling the database for more tasks + */ + public ServerActionExecutor(ActionDBAccessor db, long sleepTimeMS) { + this.serverHostName = StageUtils.getHostName(); + this.db = db; + this.sleepTimeMS = (sleepTimeMS < 1) ? POLLING_TIMEOUT_MS : sleepTimeMS; + } + + /** + * Starts this ServerActionExecutor's main thread. + */ + public void start() { + LOG.info("Starting Server Action Executor thread..."); + executorThread = new Thread(new Runnable() { + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + synchronized (wakeupSyncObject) { + if (!activeAwakeRequest) { + wakeupSyncObject.wait(sleepTimeMS); + } + activeAwakeRequest = false; + } + + doWork(); + } catch (InterruptedException e) { + LOG.warn("Server Action Executor thread interrupted, starting to shutdown..."); + break; + } + } + + LOG.info("Server Action Executor thread shutting down..."); + } + }, "Server Action Executor"); + executorThread.start(); + + if (executorThread.isAlive()) { + LOG.info("Server Action Executor thread started."); + } + } + + /** + * Attempts to stop this ServerActionExecutor's main thread. + */ + public void stop() { + LOG.info("Stopping Server Action Executor thread..."); + + if (executorThread != null) { + executorThread.interrupt(); + + // Wait for about 60 seconds for the thread to stop + for (int i = 0; i < 120; i++) { + try { + executorThread.join(500); + } catch (InterruptedException e) { + // Ignore this... + } + + if (!executorThread.isAlive()) { + break; + } + } + + if (!executorThread.isAlive()) { + executorThread = null; + } + } + + if (executorThread == null) { + LOG.info("Server Action Executor thread stopped."); + } else { + LOG.warn("Server Action Executor thread hasn't stopped, giving up waiting."); + } + } + + /** + * Attempts to force this ServerActionExecutor to wake up and do work. + * <p/> + * Should be called from another thread when we want scheduler to + * make a run ASAP (for example, to process desired configs of SCHs). + * The method is guaranteed to return quickly. + */ + public void awake() { + synchronized (wakeupSyncObject) { + activeAwakeRequest = true; + wakeupSyncObject.notify(); + } + } + + /** + * Returns a Map to be used to share data among server actions within a given request context. + * + * @param requestId a long identifying the id of the relevant request + * @return a ConcurrentMap of "shared" data + */ + private ConcurrentMap<String, Object> getRequestSharedDataContext(long requestId) { + synchronized (requestSharedDataMap) { + ConcurrentMap<String, Object> map = requestSharedDataMap.get(requestId); + + if (map == null) { + map = new ConcurrentHashMap<String, Object>(); + requestSharedDataMap.put(requestId, map); + } + + return map; + } + } + + /** + * Cleans up orphaned shared data Maps due to completed or failed request contexts. + */ + private void cleanRequestShareDataContexts() { + // Clean out any orphaned request shared data contexts + for (RequestStatus status : EnumSet.of(RequestStatus.FAILED, RequestStatus.COMPLETED)) { + List<Long> requests = db.getRequestsByStatus(status, 100, true); + + if (requests != null) { + synchronized (requestSharedDataMap) { + for (Long requestId : requests) { + requestSharedDataMap.remove(requestId); + } + } + } + } + } + + /** + * A helper method to create CommandReports indicating the action/task is in progress + * + * @return a new CommandReport + */ + private CommandReport createInProgressReport() { + CommandReport commandReport = new CommandReport(); + commandReport.setStatus(HostRoleStatus.IN_PROGRESS.toString()); + commandReport.setStdErr(""); + commandReport.setStdOut(""); + return commandReport; + } + + /** + * A helper method to create CommandReports indicating the action/task had timed out + * + * @return a new CommandReport + */ + private CommandReport createTimedOutReport() { + CommandReport commandReport = new CommandReport(); + commandReport.setStatus(HostRoleStatus.TIMEDOUT.toString()); + commandReport.setStdErr(""); + commandReport.setStdOut(""); + return commandReport; + } + + /** + * A helper method to create CommandReports indicating the action/task has had an error + * + * @param message a String containing the error message to report + * @return a new CommandReport + */ + private CommandReport createErrorReport(String message) { + CommandReport commandReport = new CommandReport(); + commandReport.setStatus(HostRoleStatus.FAILED.toString()); + commandReport.setExitCode(1); + commandReport.setStdOut("Server action failed"); + commandReport.setStdErr(message); + return commandReport; + } + + /** + * Stores the status of the task/action + * <p/> + * If the command report is not specified (null), an error report will be created. + * + * @param hostRoleCommand the HostRoleCommand for the relevant task + * @param executionCommand the ExecutionCommand for the relevant task + * @param commandReport the CommandReport to store + */ + private void updateHostRoleState(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand, + CommandReport commandReport) { + if (commandReport == null) { + commandReport = createErrorReport("Unknown error condition"); + } + + db.updateHostRoleState(executionCommand.getHostname(), hostRoleCommand.getRequestId(), + hostRoleCommand.getStageId(), executionCommand.getRole(), commandReport); + } + + /** + * Determine what the timeout for this action/task should be. + * <p/> + * If the timeout value is not set in the command parameter map (under the key + * ExecutionCommand.KeyNames.COMMAND_TIMEOUT or "command_timeout", the default timeout value will + * be used. It is expected that the timeout value stored in the command parameter map (if any) is + * in seconds. + * + * @param executionCommand the ExecutionCommand for the relevant task + * @return a long declaring the action/task's timeout + */ + private long determineTimeout(ExecutionCommand executionCommand) { + Map<String, String> params = executionCommand.getCommandParams(); + String paramsTimeout = (params == null) ? null : params.get(ExecutionCommand.KeyNames.COMMAND_TIMEOUT); + Long timeout; + + try { + timeout = (paramsTimeout == null) + ? null + : (Long.parseLong(paramsTimeout) * 1000); // Convert seconds to milliseconds + } catch (NumberFormatException e) { + timeout = null; + } + + return (timeout == null) + ? EXECUTION_TIMEOUT_MS + : ((timeout < 0) ? 0 : timeout); + } + + /** + * Execute the logic to handle each task in the queue in the order in which it was queued. + * <p/> + * A single task is executed at one time, allowing for a specified (ExecutionCommand.KeyNames.COMMAND_TIMEOUT) + * or the default timeout for it to complete before considering the task timed out. + * + * @throws InterruptedException + */ + public void doWork() throws InterruptedException { + List<HostRoleCommand> tasks = db.getTasksByHostRoleAndStatus(serverHostName, + Role.AMBARI_SERVER_ACTION.toString(), HostRoleStatus.QUEUED); + + if ((tasks != null) && !tasks.isEmpty()) { + for (HostRoleCommand task : tasks) { + Long taskId = task.getTaskId(); + + LOG.debug("Processing task #{}", taskId); + + if (task.getStatus() == HostRoleStatus.QUEUED) { + ExecutionCommandWrapper executionWrapper = task.getExecutionCommandWrapper(); + + if (executionWrapper != null) { + ExecutionCommand executionCommand = executionWrapper.getExecutionCommand(); + + if (executionCommand != null) { + // For now, execute only one task at a time. This may change in the future in the + // event it is discovered that this is a bottleneck. Since this implementation may + // change, it should be noted from outside of this class, that there is no expectation + // that tasks will be processed in order or serially. + Worker worker = new Worker(task, executionCommand); + Thread workerThread = new Thread(worker, String.format("Server Action Executor Worker %s", taskId)); + Long timeout = determineTimeout(executionCommand); + + updateHostRoleState(task, executionCommand, createInProgressReport()); + + LOG.debug("Starting Server Action Executor Worker thread for task #{}.", taskId); + workerThread.start(); + + try { + workerThread.join(timeout); + } catch (InterruptedException e) { + // Make sure the workerThread is interrupted as well. + workerThread.interrupt(); + throw e; + } + + if (workerThread.isAlive()) { + LOG.debug("Server Action Executor Worker thread for task #{} timed out - it failed to complete within {} ms.", + taskId, timeout); + workerThread.interrupt(); + updateHostRoleState(task, executionCommand, createTimedOutReport()); + } else { + LOG.debug("Server Action Executor Worker thread for task #{} exited on its own.", taskId); + updateHostRoleState(task, executionCommand, worker.getCommandReport()); + } + } else { + LOG.warn("Task #{} failed to produce an ExecutionCommand, skipping.", taskId); + } + } else { + LOG.warn("Task #{} failed to produce an ExecutionCommandWrapper, skipping.", taskId); + } + } else { + LOG.warn("Queued task #{} is expected to have a status of {} but has a status of {}, skipping.", + taskId, HostRoleStatus.QUEUED, task.getStatus()); + } + } + } + + cleanRequestShareDataContexts(); + } + + /** + * Internal class to execute a unit of work in its own thread + */ + private class Worker implements Runnable { + /** + * The task id of the relevant task + */ + private final Long taskId; + + /** + * The HostRoleCommand data used by this Worker to execute the task + */ + private final HostRoleCommand hostRoleCommand; + + /** + * The ExecutionCommand data used by this Worker to execute the task + */ + private final ExecutionCommand executionCommand; + + /** + * The resulting CommandReport used by the caller to update the status of the relevant task + */ + private CommandReport commandReport = null; + + @Override + public void run() { + try { + LOG.debug("Executing task #{}", taskId); + + commandReport = execute(hostRoleCommand, executionCommand); + + LOG.debug("Task #{} completed execution with status of {}", + taskId, (commandReport == null) ? "UNKNOWN" : commandReport.getStatus()); + } catch (Throwable t) { + LOG.warn("Task #{} failed to complete execution due to thrown exception: {}:{}", + taskId, t.getClass().getName(), t.getLocalizedMessage()); + + commandReport = createErrorReport(t.getLocalizedMessage()); + } + } + + /** + * Returns the resulting CommandReport + * + * @return a CommandReport + */ + public CommandReport getCommandReport() { + return commandReport; + } + + /** + * Attempts to execute the task specified using data from the supplied HostRoleCommand and + * ExecutionCommand. + * <p/> + * Retrieves the role parameters from the supplied ExecutionCommand and queries it for the + * "ACTON_NAME" property. The returned String is expected to be the classname of a ServerAction + * implementation. If so, an instance of the implementation class is created and executed + * yielding a CommandReport to (eventually) return back to the parent thread. + * + * @param hostRoleCommand The HostRoleCommand the HostRoleCommand for the relevant task + * @param executionCommand the ExecutionCommand for the relevant task + * @return the resulting CommandReport + * @throws AmbariException + * @throws InterruptedException + */ + private CommandReport execute(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand) + throws AmbariException, InterruptedException { + + if (hostRoleCommand == null) { + throw new AmbariException("Missing HostRoleCommand data"); + } else if (executionCommand == null) { + throw new AmbariException("Missing ExecutionCommand data"); + } else { + Map<String, String> roleParams = executionCommand.getRoleParams(); + + if (roleParams == null) { + throw new AmbariException("Missing RoleParams data"); + } else { + String actionClassname = roleParams.get(ServerAction.ACTION_NAME); + + if (actionClassname == null) { + throw new AmbariException("Missing action classname for server action"); + } else { + ServerAction action = createServerAction(actionClassname); + + if (action == null) { + throw new AmbariException("Failed to create server action: " + actionClassname); + } else { + // Set properties on the action: + action.setExecutionCommand(executionCommand); + action.setHostRoleCommand(hostRoleCommand); + + return action.execute(getRequestSharedDataContext(hostRoleCommand.getRequestId())); + } + } + } + } + } + + /** + * Attempts to create an instance of the ServerAction class implementation specified in + * classname. + * + * @param classname a String declaring the classname of the ServerAction class to instantiate + * @return the instantiated ServerAction implementation + * @throws AmbariException + */ + private ServerAction createServerAction(String classname) throws AmbariException { + try { + Class<?> actionClass = Class.forName(classname); + + if (actionClass == null) { + throw new AmbariException("Unable to load server action class: " + classname); + } else { + Class<? extends ServerAction> serverActionClass = actionClass.asSubclass(ServerAction.class); + + if (serverActionClass == null) { + throw new AmbariException("Unable to execute server action class, invalid type: " + classname); + } else { + return serverActionClass.newInstance(); + } + } + } catch (ClassNotFoundException e) { + throw new AmbariException("Unable to load server action class: " + classname, e); + } catch (InstantiationException e) { + throw new AmbariException("Unable to create an instance of the server action class: " + classname, e); + } catch (IllegalAccessException e) { + throw new AmbariException("Unable to create an instance of the server action class: " + classname, e); + } + } + + /** + * Constructs a new Worker used to execute a task + * + * @param hostRoleCommand the HostRoleCommand for the relevant task + * @param executionCommand the ExecutionCommand for the relevant task + */ + private Worker(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand) { + this.taskId = hostRoleCommand.getTaskId(); + this.hostRoleCommand = hostRoleCommand; + this.executionCommand = executionCommand; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java deleted file mode 100644 index 011cf06..0000000 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.server.serveraction; - -import org.apache.ambari.server.AmbariException; - -import java.util.Map; - -/** - * Server action manager interface. - */ -public interface ServerActionManager { - - public void executeAction(String actionName, Map<String, String> payload) - throws AmbariException; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java deleted file mode 100644 index 3a16c77..0000000 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.server.serveraction; - -import com.google.inject.Singleton; -import com.google.inject.Inject; -import org.apache.ambari.server.AmbariException; -import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.Clusters; -import org.apache.ambari.server.state.StackId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Server action manager implementation. - */ -@Singleton -public class ServerActionManagerImpl implements ServerActionManager { - - private final static Logger LOG = - LoggerFactory.getLogger(ServerActionManagerImpl.class); - - private Clusters clusters; - - @Inject - public ServerActionManagerImpl(Clusters clusters) { - this.clusters = clusters; - } - - @Override - public void executeAction(String actionName, Map<String, String> payload) - throws AmbariException { - LOG.info("Executing server action : " - + actionName + " with payload " - + payload); - - if (actionName.equals(ServerAction.Command.FINALIZE_UPGRADE)) { - updateClusterStackVersion(payload); - } else { - throw new AmbariException("Unsupported action " + actionName); - } - } - - private void updateClusterStackVersion(Map<String, String> payload) throws AmbariException { - if (payload == null - || !payload.containsKey(ServerAction.PayloadName.CLUSTER_NAME) - || !payload.containsKey(ServerAction.PayloadName.CURRENT_STACK_VERSION)) { - throw new AmbariException("Invalid payload."); - } - - StackId currentStackId = new StackId(payload.get(ServerAction.PayloadName.CURRENT_STACK_VERSION)); - final Cluster cluster = clusters.getCluster(payload.get(ServerAction.PayloadName.CLUSTER_NAME)); - cluster.setCurrentStackVersion(currentStackId); - cluster.refresh(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java index 78590fc..f624f74 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java @@ -122,6 +122,8 @@ public abstract class ServiceComponentHostEvent return new ServiceComponentHostDisableEvent(serviceComponentName, hostName, opTimestamp); case HOST_SVCCOMP_RESTORE: return new ServiceComponentHostRestoreEvent(serviceComponentName, hostName, opTimestamp); + case HOST_SVCCOMP_SERVER_ACTION: + return new ServiceComponentHostServerActionEvent(serviceComponentName, hostName, opTimestamp); } return null; } http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java index b43ac9c..e744a7e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java @@ -74,6 +74,10 @@ public enum ServiceComponentHostEventType { /** * Recovering host component from disable state */ - HOST_SVCCOMP_RESTORE + HOST_SVCCOMP_RESTORE, + /** + * Triggering a server-side action + */ + HOST_SVCCOMP_SERVER_ACTION } http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java new file mode 100644 index 0000000..c6c8b25 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.state.svccomphost; + +import org.apache.ambari.server.state.ServiceComponentHostEvent; +import org.apache.ambari.server.state.ServiceComponentHostEventType; + +/** + * Base class for all events that represent server-side actions. + */ +public class ServiceComponentHostServerActionEvent extends + ServiceComponentHostEvent { + + /** + * Constructs a new ServiceComponentHostServerActionEvent. + * <p/> + * This method is expected to be called by ether a ServiceComponentHostServerActionEvent or a + * class that extends it. + * + * @param type the ServiceComponentHostEventType - expected to be + * ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION + * @param serviceComponentName a String declaring the component for which this action is to be + * routed - expected to be "AMBARI_SERVER" + * @param hostName a String declaring the host on which the action should be executed - + * expected to be the hostname of the Ambari server + * @param opTimestamp the time in which this event was created + * @param stackId the relevant stackid + */ + protected ServiceComponentHostServerActionEvent(ServiceComponentHostEventType type, + String serviceComponentName, String hostName, + long opTimestamp, String stackId) { + super(type, serviceComponentName, hostName, opTimestamp, stackId); + } + + /** + * Constructs a new ServiceComponentHostServerActionEvent where the component name is set to + * "AMBARI_SERVER" and the type is set to ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION. + * + * @param hostName a String declaring the host on which the action should be executed - + * expected to be the hostname of the Ambari server + * @param opTimestamp the time in which this event was created + */ + public ServiceComponentHostServerActionEvent(String hostName, long opTimestamp) { + this("AMBARI_SERVER", hostName, opTimestamp); + } + + /** + * Constructs a new ServiceComponentHostServerActionEvent + * + * @param serviceComponentName a String declaring the name of component + * @param hostName a String declaring the host on which the action should be executed - + * expected to be the hostname of the Ambari server + * @param opTimestamp the time in which this event was created + */ + public ServiceComponentHostServerActionEvent(String serviceComponentName, String hostName, + long opTimestamp) { + this(ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION, serviceComponentName, hostName, + opTimestamp, ""); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java index 8b375fe..dd6817d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java @@ -18,11 +18,10 @@ package org.apache.ambari.server.state.svccomphost; -import org.apache.ambari.server.state.ServiceComponentHostEvent; import org.apache.ambari.server.state.ServiceComponentHostEventType; public class ServiceComponentHostUpgradeEvent extends - ServiceComponentHostEvent { + ServiceComponentHostServerActionEvent { public ServiceComponentHostUpgradeEvent(String serviceComponentName, http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java index 36acbc2..3da931f 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java @@ -43,6 +43,7 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.dao.ExecutionCommandDAO; import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; +import org.apache.ambari.server.serveraction.MockServerAction; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent; import org.apache.ambari.server.utils.StageUtils; @@ -52,6 +53,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; @@ -67,6 +69,10 @@ public class TestActionDBAccessorImpl { private String hostName = "host1"; private String clusterName = "cluster1"; private String actionName = "validate_kerberos"; + + private String serverHostName = StageUtils.getHostName(); // "_localhost_"; + private String serverActionName = MockServerAction.class.getName(); + private Injector injector; ActionDBAccessor db; ActionManager am; @@ -85,13 +91,18 @@ public class TestActionDBAccessorImpl { .with(new TestActionDBAccessorModule())); injector.getInstance(GuiceJpaInitializer.class); injector.injectMembers(this); + + // Add this host's name since it is needed for server-side actions. + clusters.addHost(serverHostName); + clusters.getHost(serverHostName).persist(); + clusters.addHost(hostName); clusters.getHost(hostName).persist(); clusters.addCluster(clusterName); db = injector.getInstance(ActionDBAccessorImpl.class); am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db, - new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), + new HostsMap((String) null), injector.getInstance(UnitOfWork.class), injector.getInstance(RequestFactory.class), null); } @@ -157,7 +168,7 @@ public class TestActionDBAccessorImpl { "(command report status should be ignored)", HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER")); } - + @Test public void testGetStagesInProgress() throws AmbariException { String hostname = "host1"; @@ -168,7 +179,7 @@ public class TestActionDBAccessorImpl { db.persistActions(request); assertEquals(2, stages.size()); } - + @Test public void testGetStagesInProgressWithFailures() throws AmbariException { String hostname = "host1"; @@ -268,6 +279,46 @@ public class TestActionDBAccessorImpl { } @Test + public void testServerActionScheduled() throws InterruptedException, AmbariException { + populateActionDBWithServerAction(db, serverHostName, requestId, stageId); + + final String roleName = Role.AMBARI_SERVER_ACTION.toString(); + Stage stage = db.getStage(StageUtils.getActionId(requestId, stageId)); + assertEquals(HostRoleStatus.PENDING, stage.getHostRoleStatus(serverHostName, roleName)); + List<HostRoleCommandEntity> entities = + hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName); + + assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus()); + stage.setHostRoleStatus(serverHostName, roleName, HostRoleStatus.QUEUED); + + entities = hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName); + assertEquals(HostRoleStatus.QUEUED, stage.getHostRoleStatus(serverHostName, roleName)); + assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus()); + + db.hostRoleScheduled(stage, serverHostName, roleName); + + entities = hostRoleCommandDAO.findByHostRole( + serverHostName, requestId, stageId, roleName); + assertEquals(HostRoleStatus.QUEUED, entities.get(0).getStatus()); + + + Thread thread = new Thread() { + @Override + public void run() { + Stage stage1 = db.getStage("23-31"); + stage1.setHostRoleStatus(serverHostName, roleName, HostRoleStatus.COMPLETED); + db.hostRoleScheduled(stage1, serverHostName, roleName); + } + }; + + thread.start(); + thread.join(); + + entities = hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName); + assertEquals("Concurrent update failed", HostRoleStatus.COMPLETED, entities.get(0).getStatus()); + } + + @Test public void testUpdateHostRole() throws Exception { populateActionDB(db, hostName, requestId, stageId); StringBuilder sb = new StringBuilder(); @@ -310,7 +361,7 @@ public class TestActionDBAccessorImpl { populateActionDB(db, hostName, requestId + 1, stageId); List<Long> requestIdsResult = db.getRequestsByStatus(null, BaseRequest.DEFAULT_PAGE_SIZE, false); - + assertNotNull("List of request IDs is null", requestIdsResult); assertEquals("Request IDs not matches", requestIds, requestIdsResult); } @@ -520,7 +571,26 @@ public class TestActionDBAccessorImpl { List<RequestResourceFilter> resourceFilters = new ArrayList<RequestResourceFilter>() {{ add(resourceFilter); }}; ExecuteActionRequest executeActionRequest = new ExecuteActionRequest - ("cluster1", null, actionName, resourceFilters, null, null, false); + ("cluster1", null, actionName, resourceFilters, null, null, false); + Request request = new Request(stages, clusters); + db.persistActions(request); + } + + private void populateActionDBWithServerAction(ActionDBAccessor db, String hostname, + long requestId, long stageId) throws AmbariException { + Stage s = new Stage(requestId, "/a/b", "cluster1", 1L, "action db accessor test", + "", "commandParamsStage", "hostParamsStage"); + s.setStageId(stageId); + s.addServerActionCommand(serverActionName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, clusterName, null, null, 300); + List<Stage> stages = new ArrayList<Stage>(); + stages.add(s); + final RequestResourceFilter resourceFilter = new RequestResourceFilter("AMBARI", "SERVER", Arrays.asList(hostname)); + List<RequestResourceFilter> resourceFilters = new + ArrayList<RequestResourceFilter>() {{ + add(resourceFilter); + }}; + ExecuteActionRequest executeActionRequest = new ExecuteActionRequest + ("cluster1", null, serverActionName, resourceFilters, null, null, false); Request request = new Request(stages, clusters); db.persistActions(request); } http://git-wip-us.apache.org/repos/asf/ambari/blob/3d397dc0/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java index 5a2c467..ed1318c 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java @@ -84,7 +84,7 @@ public class TestActionManager { public void testActionResponse() throws AmbariException { ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class); ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), - clusters, db, new HostsMap((String) null), null, unitOfWork, + clusters, db, new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null); populateActionDB(db, hostname); Stage stage = db.getAllStages(requestId).get(0); @@ -127,7 +127,7 @@ public class TestActionManager { public void testLargeLogs() throws AmbariException { ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class); ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), - clusters, db, new HostsMap((String) null), null, unitOfWork, + clusters, db, new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null); populateActionDB(db, hostname); Stage stage = db.getAllStages(requestId).get(0); @@ -217,7 +217,7 @@ public class TestActionManager { replay(queue, db, clusters); - ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, null, unitOfWork, + ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, unitOfWork, injector.getInstance(RequestFactory.class), null); assertSame(listStages, manager.getActions(requestId));