http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java new file mode 100644 index 0000000..9f9db5c --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java @@ -0,0 +1,814 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.Role; +import org.apache.ambari.server.RoleCommand; +import org.apache.ambari.server.actionmanager.ActionManager; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleCommandFactory; +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.api.services.AmbariMetaInfo; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.ConfigGroupRequest; +import org.apache.ambari.server.controller.RequestStatusResponse; +import org.apache.ambari.server.controller.ServiceComponentHostRequest; +import org.apache.ambari.server.controller.ShortTaskStatus; +import org.apache.ambari.server.controller.internal.ConfigGroupResourceProvider; +import org.apache.ambari.server.controller.internal.HostComponentResourceProvider; +import org.apache.ambari.server.controller.internal.HostResourceProvider; +import org.apache.ambari.server.controller.internal.RequestImpl; +import org.apache.ambari.server.controller.internal.ResourceImpl; +import org.apache.ambari.server.controller.internal.Stack; +import org.apache.ambari.server.controller.spi.NoSuchParentResourceException; +import org.apache.ambari.server.controller.spi.Predicate; +import org.apache.ambari.server.controller.spi.Resource; +import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException; +import org.apache.ambari.server.controller.spi.SystemException; +import org.apache.ambari.server.controller.spi.UnsupportedPropertyException; +import org.apache.ambari.server.controller.utilities.ClusterControllerHelper; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.ConfigHelper; +import org.apache.ambari.server.state.ConfigImpl; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.configgroup.ConfigGroup; +import org.apache.ambari.server.state.host.HostImpl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.ambari.server.controller.AmbariServer.getController; + +/** + * Represents a set of requests to a single host such as install, start, etc. + */ +public class HostRequest implements Comparable<HostRequest> { + + private long requestId; + private String blueprint; + private HostGroup hostGroup; + private String hostgroupName; + private Predicate predicate; + private int cardinality = -1; + private String hostname = null; + private String cluster; + private boolean containsMaster; + private long stageId = -1; + //todo: should be able to use the presence of hostName for this + private boolean outstanding = true; + + //todo: remove + private Map<String, Long> logicalInstallTaskIds = new HashMap<String, Long>(); + //todo: remove + private Map<String, Long> logicalStartTaskIds = new HashMap<String, Long>(); + + Collection<HostRoleCommand> logicalTasks = new ArrayList<HostRoleCommand>(); + + // logical task id -> physical tasks + private Map<Long, Collection<Long>> physicalTasks = new HashMap<Long, Collection<Long>>(); + + private static HostResourceProvider hostResourceProvider; + + private HostComponentResourceProvider hostComponentResourceProvider; + + private AmbariManagementController controller = getController(); + private ActionManager actionManager = controller.getActionManager(); + private ConfigHelper configHelper = controller.getConfigHelper(); + private AmbariMetaInfo metaInfoManager = controller.getAmbariMetaInfo(); + + //todo: temporary refactoring step + private TopologyManager.ClusterTopologyContext topologyContext; + + private static HostRoleCommandFactory hostRoleCommandFactory; + + public static void init(HostRoleCommandFactory factory) { + hostRoleCommandFactory = factory; + } + + public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup, + int cardinality, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) { + this.requestId = requestId; + this.stageId = stageId; + this.cluster = cluster; + this.blueprint = blueprintName; + this.hostGroup = hostGroup; + this.hostgroupName = hostGroup.getName(); + this.cardinality = cardinality; + this.predicate = predicate; + this.containsMaster = hostGroup.containsMasterComponent(); + this.topologyContext = topologyContext; + + createTasks(); + System.out.println("HostRequest: Created request: Host Association Pending"); + } + + public HostRequest(long requestId, long stageId, String cluster, String blueprintName, HostGroup hostGroup, + String hostname, Predicate predicate, TopologyManager.ClusterTopologyContext topologyContext) { + this.requestId = requestId; + this.stageId = stageId; + this.cluster = cluster; + this.blueprint = blueprintName; + this.hostGroup = hostGroup; + this.hostgroupName = hostGroup.getName(); + this.hostname = hostname; + this.predicate = predicate; + this.containsMaster = hostGroup.containsMasterComponent(); + this.topologyContext = topologyContext; + + createTasks(); + System.out.println("HostRequest: Created request for host: " + hostname); + } + + //todo: synchronization + public synchronized HostOfferResponse offer(HostImpl host) { + if (! outstanding) { + return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE); + } + if (matchesHost(host)) { + outstanding = false; + hostname = host.getHostName(); + List<TopologyTask> tasks = provision(host); + + return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, hostGroup.getName(), tasks); + } else { + return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE); + } + } + + public void setHostName(String hostName) { + this.hostname = hostName; + } + + public long getRequestId() { + return requestId; + } + + public String getClusterName() { + return cluster; + } + public String getBlueprint() { + return blueprint; + } + + public HostGroup getHostGroup() { + return hostGroup; + } + + public String getHostgroupName() { + return hostgroupName; + } + + public int getCardinality() { + return cardinality; + } + + public Predicate getPredicate() { + return predicate; + } + + + private List<TopologyTask> provision(HostImpl host) { + List<TopologyTask> tasks = new ArrayList<TopologyTask>(); + + tasks.add(new CreateHostResourcesTask(topologyContext.getClusterTopology(), host, getHostgroupName())); + setHostOnTasks(host); + + HostGroup hostGroup = getHostGroup(); + tasks.add(new ConfigureConfigGroup(getConfigurationGroupName(hostGroup.getBlueprintName(), + hostGroup.getName()), getClusterName(), hostname)); + + tasks.add(getInstallTask()); + tasks.add(getStartTask()); + + return tasks; + } + + private void createTasks() { + HostGroup hostGroup = getHostGroup(); + for (String component : hostGroup.getComponents()) { + if (component == null || component.equals("AMBARI_SERVER")) { + System.out.printf("Skipping component %s when creating request\n", component); + continue; + } + + String hostName = getHostName() != null ? + getHostName() : + "PENDING HOST ASSIGNMENT : HOSTGROUP=" + getHostgroupName(); + + HostRoleCommand installTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.INSTALL); + installTask.setStatus(HostRoleStatus.PENDING); + installTask.setTaskId(topologyContext.getNextTaskId()); + installTask.setRequestId(getRequestId()); + installTask.setStageId(stageId); + + //todo: had to add requestId to ShortTaskStatus + //todo: revert addition of requestId when we are using LogicalTask + installTask.setRequestId(getRequestId()); + + logicalTasks.add(installTask); + registerLogicalInstallTaskId(component, installTask.getTaskId()); + + Stack stack = hostGroup.getStack(); + try { + // if component isn't a client, add a start task + if (! metaInfoManager.getComponent(stack.getName(), stack.getVersion(), stack.getServiceForComponent(component), component).isClient()) { + HostRoleCommand startTask = hostRoleCommandFactory.create(hostName, Role.valueOf(component), null, RoleCommand.START); + startTask.setStatus(HostRoleStatus.PENDING); + startTask.setRequestId(getRequestId()); + startTask.setTaskId(topologyContext.getNextTaskId()); + startTask.setRequestId(getRequestId()); + startTask.setStageId(stageId); + logicalTasks.add(startTask); + registerLogicalStartTaskId(component, startTask.getTaskId()); + } + } catch (AmbariException e) { + e.printStackTrace(); + //todo: how to handle + throw new RuntimeException(e); + } + } + } + + /** + * Get a config group name based on a bp and host group. + * + * @param bpName blueprint name + * @param hostGroupName host group name + * @return config group name + */ + protected String getConfigurationGroupName(String bpName, String hostGroupName) { + return String.format("%s:%s", bpName, hostGroupName); + } + + private void setHostOnTasks(HostImpl host) { + for (HostRoleCommand task : getTasks()) { + task.setHostEntity(host.getHostEntity()); + } + } + + //todo: analyze all all configuration needs for dealing with deprecated properties + /** + * Since global configs are deprecated since 1.7.0, but still supported. + * We should automatically map any globals used, to *-env dictionaries. + * + * @param blueprintConfigurations map of blueprint configurations keyed by type + */ + private void handleGlobalsBackwardsCompability(Stack stack, + Map<String, Map<String, String>> blueprintConfigurations) { + + StackId stackId = new StackId(stack.getName(), stack.getVersion()); + configHelper.moveDeprecatedGlobals(stackId, blueprintConfigurations, getClusterName()); + } + + public Collection<HostRoleCommand> getTasks() { + // sync logical task state with physical tasks + for (HostRoleCommand logicalTask : logicalTasks) { + Collection<Long> physicalTaskIds = physicalTasks.get(logicalTask.getTaskId()); + if (physicalTaskIds != null) { + //todo: for now only one physical task per logical task + long physicalTaskId = physicalTaskIds.iterator().next(); + HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId); + if (physicalTask != null) { + logicalTask.setStatus(physicalTask.getStatus()); + logicalTask.setCommandDetail(physicalTask.getCommandDetail()); + logicalTask.setCustomCommandName(physicalTask.getCustomCommandName()); + //todo: once we retry on failures, start/end times could span multiple physical tasks + logicalTask.setStartTime(physicalTask.getStartTime()); + logicalTask.setEndTime(physicalTask.getEndTime()); + logicalTask.setErrorLog(physicalTask.getErrorLog()); + logicalTask.setExitCode(physicalTask.getExitCode()); + logicalTask.setExecutionCommandWrapper(physicalTask.getExecutionCommandWrapper()); + //todo: may be handled at a higher level than physical task + logicalTask.setLastAttemptTime(physicalTask.getLastAttemptTime()); + logicalTask.setOutputLog(physicalTask.getOutputLog()); + logicalTask.setStderr(physicalTask.getStderr()); + logicalTask.setStdout(physicalTask.getStdout()); + logicalTask.setStructuredOut(physicalTask.getStructuredOut()); + } + } + } + return logicalTasks; + } + + public Collection<HostRoleCommandEntity> getTaskEntities() { + Collection<HostRoleCommandEntity> taskEntities = new ArrayList<HostRoleCommandEntity>(); + for (HostRoleCommand task : logicalTasks) { + HostRoleCommandEntity entity = task.constructNewPersistenceEntity(); + // the above method doesn't set all of the fields for some unknown reason + entity.setRequestId(task.getRequestId()); + entity.setStageId(task.getStageId()); + entity.setTaskId(task.getTaskId()); + entity.setOutputLog(task.getOutputLog()); + entity.setErrorLog(task.errorLog); + + // set state from physical task + Collection<Long> physicalTaskIds = physicalTasks.get(task.getTaskId()); + if (physicalTaskIds != null) { + //todo: for now only one physical task per logical task + long physicalTaskId = physicalTaskIds.iterator().next(); + HostRoleCommand physicalTask = actionManager.getTaskById(physicalTaskId); + if (physicalTask != null) { + entity.setStatus(physicalTask.getStatus()); + entity.setCommandDetail(physicalTask.getCommandDetail()); + entity.setCustomCommandName(physicalTask.getCustomCommandName()); + //todo: once we retry on failures, start/end times could span multiple physical tasks + entity.setStartTime(physicalTask.getStartTime()); + entity.setEndTime(physicalTask.getEndTime()); + entity.setErrorLog(physicalTask.getErrorLog()); + entity.setExitcode(physicalTask.getExitCode()); + //todo: may be handled at a higher level than physical task + entity.setLastAttemptTime(physicalTask.getLastAttemptTime()); + entity.setOutputLog(physicalTask.getOutputLog()); + entity.setStdError(physicalTask.getStderr().getBytes()); + entity.setStdOut(physicalTask.getStdout().getBytes()); + entity.setStructuredOut(physicalTask.getStructuredOut().getBytes()); + } + } + + taskEntities.add(entity); + } + return taskEntities; + } + + public boolean containsMaster() { + return containsMaster; + } + + public boolean matchesHost(HostImpl host) { + if (hostname != null) { + return host.getHostName().equals(hostname); + } else if (predicate != null) { + return predicate.evaluate(new HostResourceAdapter(host)); + } else { + return true; + } + } + + public String getHostName() { + return hostname; + } + + public long getStageId() { + return stageId; + } + + //todo: remove + private void registerLogicalInstallTaskId(String component, long taskId) { + logicalInstallTaskIds.put(component, taskId); + } + + //todo: remove + private void registerLogicalStartTaskId(String component, long taskId) { + logicalStartTaskIds.put(component, taskId); + } + + //todo: remove + private long getLogicalInstallTaskId(String component) { + return logicalInstallTaskIds.get(component); + } + + //todo: remove + private long getLogicalStartTaskId(String component) { + return logicalStartTaskIds.get(component); + } + + //todo: since this is used to determine equality, using hashCode() isn't safe as it can return the same + //todo: value for 2 unequal requests + @Override + public int compareTo(HostRequest other) { + if (containsMaster()) { + return other.containsMaster() ? hashCode() - other.hashCode() : -1; + } else if (other.containsMaster()) { + return 1; + } else return hashCode() - other.hashCode(); + } + + //todo: once we have logical tasks, move tracking of physical tasks there + public void registerPhysicalTaskId(long logicalTaskId, long physicalTaskId) { + Collection<Long> physicalTasksForId = physicalTasks.get(logicalTaskId); + if (physicalTasksForId == null) { + physicalTasksForId = new HashSet<Long>(); + physicalTasks.put(logicalTaskId, physicalTasksForId); + } + physicalTasksForId.add(physicalTaskId); + } + + //todo: temporary step + public TopologyTask getInstallTask() { + return new InstallHostTask(); + } + + //todo: temporary step + public TopologyTask getStartTask() { + return new StartHostTask(); + } + + //todo: temporary refactoring step + public HostGroupInfo createHostGroupInfo(HostGroup group) { + HostGroupInfo info = new HostGroupInfo(group.getName()); + info.setConfiguration(group.getConfiguration()); + + return info; + } + + private synchronized HostResourceProvider getHostResourceProvider() { + if (hostResourceProvider == null) { + hostResourceProvider = (HostResourceProvider) + ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.Host); + + } + return hostResourceProvider; + } + + private synchronized HostComponentResourceProvider getHostComponentResourceProvider() { + if (hostComponentResourceProvider == null) { + hostComponentResourceProvider = (HostComponentResourceProvider) + ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.HostComponent); + } + return hostComponentResourceProvider; + } + + //todo: extract + private class InstallHostTask implements TopologyTask { + //todo: use future to obtain returned Response which contains the request id + //todo: error handling + //todo: monitor status of requests + + @Override + public Type getType() { + return Type.INSTALL; + } + + @Override + public void run() { + try { + System.out.println("HostRequest.InstallHostTask: Executing INSTALL task for host: " + hostname); + RequestStatusResponse response = getHostResourceProvider().install(getHostName(), cluster); + // map logical install tasks to physical install tasks + List<ShortTaskStatus> underlyingTasks = response.getTasks(); + for (ShortTaskStatus task : underlyingTasks) { + Long logicalInstallTaskId = getLogicalInstallTaskId(task.getRole()); + //todo: for now only one physical task per component + long taskId = task.getTaskId(); + //physicalTasks.put(logicalInstallTaskId, Collections.singleton(taskId)); + registerPhysicalTaskId(logicalInstallTaskId, taskId); + + //todo: move this to provision + //todo: shouldn't have to iterate over all tasks to find install task + //todo: we are doing the same thing in the above registerPhysicalTaskId() call + // set attempt count on task + for (HostRoleCommand logicalTask : logicalTasks) { + if (logicalTask.getTaskId() == logicalInstallTaskId) { + logicalTask.incrementAttemptCount(); + } + } + } + } catch (ResourceAlreadyExistsException e) { + e.printStackTrace(); + } catch (SystemException e) { + e.printStackTrace(); + } catch (NoSuchParentResourceException e) { + e.printStackTrace(); + } catch (UnsupportedPropertyException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + //todo: extract + private class StartHostTask implements TopologyTask { + //todo: use future to obtain returned Response which contains the request id + //todo: error handling + //todo: monitor status of requests + + @Override + public Type getType() { + return Type.START; + } + + @Override + public void run() { + try { + System.out.println("HostRequest.StartHostTask: Executing START task for host: " + hostname); + RequestStatusResponse response = getHostComponentResourceProvider().start(cluster, hostname); + // map logical install tasks to physical install tasks + List<ShortTaskStatus> underlyingTasks = response.getTasks(); + for (ShortTaskStatus task : underlyingTasks) { + String component = task.getRole(); + Long logicalStartTaskId = getLogicalStartTaskId(component); + // for now just set on outer map + registerPhysicalTaskId(logicalStartTaskId, task.getTaskId()); + + //todo: move this to provision + // set attempt count on task + for (HostRoleCommand logicalTask : logicalTasks) { + if (logicalTask.getTaskId() == logicalStartTaskId) { + logicalTask.incrementAttemptCount(); + } + } + } + } catch (SystemException e) { + e.printStackTrace(); + } catch (UnsupportedPropertyException e) { + e.printStackTrace(); + } catch (NoSuchParentResourceException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private class CreateHostResourcesTask implements TopologyTask { + private ClusterTopology topology; + private HostImpl host; + private String groupName; + + public CreateHostResourcesTask(ClusterTopology topology, HostImpl host, String groupName) { + this.topology = topology; + this.host = host; + this.groupName = groupName; + } + + @Override + public Type getType() { + return Type.RESOURCE_CREATION; + } + + @Override + public void run() { + try { + createHostResources(); + } catch (AmbariException e) { + //todo: report error to caller + e.printStackTrace(); + System.out.println("An error occurred when creating host resources: " + e.toString()); + } + } + + private void createHostResources() throws AmbariException { + Map<String, Object> properties = new HashMap<String, Object>(); + properties.put(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID, getClusterName()); + properties.put(HostResourceProvider.HOST_NAME_PROPERTY_ID, host.getHostName()); + properties.put(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID, host.getRackInfo()); + + getHostResourceProvider().createHosts(new RequestImpl(null, Collections.singleton(properties), null, null)); + createHostComponentResources(); + } + + private void createHostComponentResources() throws AmbariException { + Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>(); + Stack stack = topology.getBlueprint().getStack(); + for (String component : topology.getBlueprint().getHostGroup(groupName).getComponents()) { + //todo: handle this in a generic manner. These checks are all over the code + if (! component.equals("AMBARI_SERVER")) { + requests.add(new ServiceComponentHostRequest(topology.getClusterName(), + stack.getServiceForComponent(component), component, host.getHostName(), null)); + } + } + + controller.createHostComponents(requests); + } + } + + //todo: extract + private class ConfigureConfigGroup implements TopologyTask { + private String groupName; + private String clusterName; + private String hostName; + + public ConfigureConfigGroup(String groupName, String clusterName, String hostName) { + this.groupName = groupName; + this.clusterName = clusterName; + this.hostName = hostName; + } + + @Override + public Type getType() { + return Type.CONFIGURE; + } + + @Override + public void run() { + try { + //todo: add task to offer response + if (! addHostToExistingConfigGroups()) { + createConfigGroupsAndRegisterHost(); + } + } catch (Exception e) { + //todo: handle exceptions + e.printStackTrace(); + throw new RuntimeException("Unable to register config group for host: " + hostname); + } + } + + /** + * Add the new host to an existing config group. + * + * @throws SystemException an unknown exception occurred + * @throws UnsupportedPropertyException an unsupported property was specified in the request + * @throws NoSuchParentResourceException a parent resource doesn't exist + */ + private boolean addHostToExistingConfigGroups() + throws SystemException, + UnsupportedPropertyException, + NoSuchParentResourceException { + + boolean addedHost = false; + + Clusters clusters; + Cluster cluster; + try { + clusters = controller.getClusters(); + cluster = clusters.getCluster(clusterName); + } catch (AmbariException e) { + throw new IllegalArgumentException( + String.format("Attempt to add hosts to a non-existent cluster: '%s'", clusterName)); + } + // I don't know of a method to get config group by name + //todo: add a method to get config group by name + Map<Long, ConfigGroup> configGroups = cluster.getConfigGroups(); + for (ConfigGroup group : configGroups.values()) { + if (group.getName().equals(groupName)) { + try { + group.addHost(clusters.getHost(hostName)); + group.persist(); + addedHost = true; + } catch (AmbariException e) { + // shouldn't occur, this host was just added to the cluster + throw new SystemException(String.format( + "Unable to obtain newly created host '%s' from cluster '%s'", hostName, clusterName)); + } + } + } + return addedHost; + } + + /** + * Register config groups for host group scoped configuration. + * For each host group with configuration specified in the blueprint, a config group is created + * and the hosts associated with the host group are assigned to the config group. + * + * @throws ResourceAlreadyExistsException attempt to create a config group that already exists + * @throws SystemException an unexpected exception occurs + * @throws UnsupportedPropertyException an invalid property is provided when creating a config group + * @throws NoSuchParentResourceException attempt to create a config group for a non-existing cluster + */ + private void createConfigGroupsAndRegisterHost() throws + ResourceAlreadyExistsException, SystemException, + UnsupportedPropertyException, NoSuchParentResourceException { + + //HostGroupEntity entity = hostGroup.getEntity(); + HostGroup hostGroup = getHostGroup(); + Map<String, Map<String, Config>> groupConfigs = new HashMap<String, Map<String, Config>>(); + + Stack stack = hostGroup.getStack(); + + // get the host-group config with cluster creation template overrides + Configuration topologyHostGroupConfig = topologyContext.getClusterTopology(). + getHostGroupInfo().get(hostGroup.getName()).getConfiguration(); + + //handling backwards compatibility for group configs + //todo: doesn't belong here + handleGlobalsBackwardsCompability(stack, topologyHostGroupConfig.getProperties()); + + // iterate over topo host group configs which were defined in CCT/HG and BP/HG only, no parent configs + for (Map.Entry<String, Map<String, String>> entry: topologyHostGroupConfig.getProperties().entrySet()) { + String type = entry.getKey(); + String service = stack.getServiceForConfigType(type); + Config config = new ConfigImpl(type); + config.setTag(hostGroup.getName()); + config.setProperties(entry.getValue()); + //todo: attributes + Map<String, Config> serviceConfigs = groupConfigs.get(service); + if (serviceConfigs == null) { + serviceConfigs = new HashMap<String, Config>(); + groupConfigs.put(service, serviceConfigs); + } + serviceConfigs.put(type, config); + } + + String bpName = topologyContext.getClusterTopology().getBlueprint().getName(); + for (Map.Entry<String, Map<String, Config>> entry : groupConfigs.entrySet()) { + String service = entry.getKey(); + Map<String, Config> serviceConfigs = entry.getValue(); + String absoluteGroupName = getConfigurationGroupName(bpName, hostGroup.getName()); + Collection<String> groupHosts; + + groupHosts = topologyContext.getClusterTopology().getHostGroupInfo(). + get(hostgroupName).getHostNames(); + + ConfigGroupRequest request = new ConfigGroupRequest( + null, getClusterName(), absoluteGroupName, service, "Host Group Configuration", + new HashSet<String>(groupHosts), serviceConfigs); + + // get the config group provider and create config group resource + ConfigGroupResourceProvider configGroupProvider = (ConfigGroupResourceProvider) + ClusterControllerHelper.getClusterController().ensureResourceProvider(Resource.Type.ConfigGroup); + configGroupProvider.createResources(Collections.singleton(request)); + } + } + + + } + + private class HostResourceAdapter implements Resource { + Resource hostResource; + + public HostResourceAdapter(HostImpl host) { + buildPropertyMap(host); + } + + @Override + public Object getPropertyValue(String id) { + return hostResource.getPropertyValue(id); + } + + @Override + public Map<String, Map<String, Object>> getPropertiesMap() { + return hostResource.getPropertiesMap(); + } + + @Override + public Type getType() { + return Type.Host; + } + + @Override + public void addCategory(String id) { + // read only, nothing to do + } + + @Override + public void setProperty(String id, Object value) { + // read only, nothing to do + } + + private void buildPropertyMap(HostImpl host) { + hostResource = new ResourceImpl(Resource.Type.Host); + + hostResource.setProperty(HostResourceProvider.HOST_NAME_PROPERTY_ID, + host.getHostName()); + hostResource.setProperty(HostResourceProvider.HOST_PUBLIC_NAME_PROPERTY_ID, + host.getPublicHostName()); + hostResource.setProperty(HostResourceProvider.HOST_IP_PROPERTY_ID, + host.getIPv4()); + hostResource.setProperty(HostResourceProvider.HOST_TOTAL_MEM_PROPERTY_ID, + host.getTotalMemBytes()); + hostResource.setProperty(HostResourceProvider.HOST_CPU_COUNT_PROPERTY_ID, + (long) host.getCpuCount()); + hostResource.setProperty(HostResourceProvider.HOST_PHYSICAL_CPU_COUNT_PROPERTY_ID, + (long) host.getPhCpuCount()); + hostResource.setProperty(HostResourceProvider.HOST_OS_ARCH_PROPERTY_ID, + host.getOsArch()); + hostResource.setProperty(HostResourceProvider.HOST_OS_TYPE_PROPERTY_ID, + host.getOsType()); + hostResource.setProperty(HostResourceProvider.HOST_OS_FAMILY_PROPERTY_ID, + host.getOsFamily()); + hostResource.setProperty(HostResourceProvider.HOST_RACK_INFO_PROPERTY_ID, + host.getRackInfo()); + hostResource.setProperty(HostResourceProvider.HOST_LAST_HEARTBEAT_TIME_PROPERTY_ID, + host.getLastHeartbeatTime()); + hostResource.setProperty(HostResourceProvider.HOST_LAST_AGENT_ENV_PROPERTY_ID, + host.getLastAgentEnv()); + hostResource.setProperty(HostResourceProvider.HOST_LAST_REGISTRATION_TIME_PROPERTY_ID, + host.getLastRegistrationTime()); + hostResource.setProperty(HostResourceProvider.HOST_HOST_STATUS_PROPERTY_ID, + host.getStatus()); + hostResource.setProperty(HostResourceProvider.HOST_HOST_HEALTH_REPORT_PROPERTY_ID, + host.getHealthStatus().getHealthReport()); + hostResource.setProperty(HostResourceProvider.HOST_DISK_INFO_PROPERTY_ID, + host.getDisksInfo()); + hostResource.setProperty(HostResourceProvider.HOST_STATE_PROPERTY_ID, + host.getState()); + } + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java new file mode 100644 index 0000000..042e9fc --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +/** + * Indicates an invalid topology. + */ +public class InvalidTopologyException extends Exception { + public InvalidTopologyException(String s) { + super(s); + } + + public InvalidTopologyException(String s, Throwable throwable) { + super(s, throwable); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java new file mode 100644 index 0000000..85422a0 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/InvalidTopologyTemplateException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +/** + * The information provided is invalid for the template. + + * To change this template use File | Settings | File Templates. + */ +public class InvalidTopologyTemplateException extends Exception { + public InvalidTopologyTemplateException(String s) { + super(s); + } + + public InvalidTopologyTemplateException(String s, Throwable throwable) { + super(s, throwable); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java new file mode 100644 index 0000000..5273ff8 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.actionmanager.Request; +import org.apache.ambari.server.actionmanager.Stage; +import org.apache.ambari.server.controller.RequestStatusResponse; +import org.apache.ambari.server.controller.ShortTaskStatus; +import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; +import org.apache.ambari.server.orm.entities.StageEntity; +import org.apache.ambari.server.state.host.HostImpl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import static org.apache.ambari.server.controller.AmbariServer.getController; + +/** + * Logical Request implementation. + */ +public class LogicalRequest extends Request { + + private Collection<HostRequest> allHostRequests = new ArrayList<HostRequest>(); + // sorted set with master host requests given priority + private Collection<HostRequest> outstandingHostRequests = new TreeSet<HostRequest>(); + private Map<String, HostRequest> requestsWithReservedHosts = new HashMap<String, HostRequest>(); + + private final ClusterTopology topology; + + + //todo: topologyContext is a temporary refactoring step + public LogicalRequest(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) throws AmbariException { + //todo: abstract usage of controller, etc ... + super(getController().getActionManager().getNextRequestId(), getController().getClusters().getCluster( + requestRequest.getClusterName()).getClusterId(), getController().getClusters()); + + this.topology = topologyContext.getClusterTopology(); + createHostRequests(requestRequest, topologyContext); + } + + public HostOfferResponse offer(HostImpl host) { + // attempt to match to a host request with an explicit host reservation first + synchronized (requestsWithReservedHosts) { + HostRequest hostRequest = requestsWithReservedHosts.remove(host.getHostName()); + if (hostRequest != null) { + HostOfferResponse response = hostRequest.offer(host); + if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) { + //todo: error handling. This is really a system exception and shouldn't happen + throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " + + host.getHostName()); + } + return response; + } + } + + // not explicitly reserved, at least not in this request, so attempt to match to outstanding host requests + boolean predicateRejected = false; + synchronized (outstandingHostRequests) { + //todo: prioritization of master host requests + Iterator<HostRequest> hostRequestIterator = outstandingHostRequests.iterator(); + while (hostRequestIterator.hasNext()) { + HostOfferResponse response = hostRequestIterator.next().offer(host); + switch (response.getAnswer()) { + case ACCEPTED: + hostRequestIterator.remove(); + return response; + case DECLINED_DONE: + //todo: should have been done on ACCEPT + hostRequestIterator.remove(); + case DECLINED_PREDICATE: + predicateRejected = true; + } + } + } + // if at least one outstanding host request rejected for predicate or we have an outstanding request + // with a reserved host decline due to predicate, otherwise decline due to all hosts being resolved + //todo: could also check if outstandingHostRequests is empty + return predicateRejected || ! requestsWithReservedHosts.isEmpty() ? + new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE) : + new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE); + } + + //todo + @Override + public Collection<Stage> getStages() { + return super.getStages(); + } + + @Override + public List<HostRoleCommand> getCommands() { + List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>(); + for (HostRequest hostRequest : allHostRequests) { + commands.addAll(new ArrayList<HostRoleCommand>(hostRequest.getTasks())); + } + return commands; + } + + public Collection<String> getReservedHosts() { + return requestsWithReservedHosts.keySet(); + } + + //todo: account for blueprint name? + //todo: this should probably be done implicitly at a lower level + public boolean areGroupsResolved(Collection<String> hostGroupNames) { + synchronized (outstandingHostRequests) { + // iterate over outstanding host requests + for (HostRequest request : outstandingHostRequests) { + if (hostGroupNames.contains(request.getHostgroupName()) && request.getHostName() == null) { + return false; + } + } + } + return true; + } + + public Map<String, Collection<String>> getProjectedTopology() { + Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>(); + + //todo: synchronization + for (HostRequest hostRequest : allHostRequests) { + HostGroup hostGroup = hostRequest.getHostGroup(); + for (String host : topology.getHostGroupInfo().get(hostGroup.getName()).getHostNames()) { + Collection<String> hostComponents = hostComponentMap.get(host); + if (hostComponents == null) { + hostComponents = new HashSet<String>(); + hostComponentMap.put(host, hostComponents); + } + hostComponents.addAll(hostGroup.getComponents()); + } + } + return hostComponentMap; + } + + //todo: currently we are just returning all stages for all requests + //todo: and relying on the StageResourceProvider to convert each to a resource and do a predicate eval on each + //todo: needed to change the name to avoid a name collision. + public Collection<StageEntity> getStageEntities() { + Collection<StageEntity> stages = new ArrayList<StageEntity>(); + for (HostRequest hostRequest : allHostRequests) { + StageEntity stage = new StageEntity(); + stage.setStageId(hostRequest.getStageId()); + stage.setRequestContext(getRequestContext()); + stage.setRequestId(getRequestId()); + //todo: not sure what this byte array is??? + //stage.setClusterHostInfo(); + stage.setClusterId(getClusterId()); + stage.setSkippable(false); + // getTaskEntities() sync's state with physical tasks + stage.setHostRoleCommands(hostRequest.getTaskEntities()); + + stages.add(stage); + } + return stages; + } + + public RequestStatusResponse getRequestStatus() { + RequestStatusResponse requestStatus = new RequestStatusResponse(getRequestId()); + requestStatus.setRequestContext(getRequestContext()); + //todo: other request status fields + //todo: ordering of tasks? + + // convert HostRoleCommands to ShortTaskStatus + List<ShortTaskStatus> shortTasks = new ArrayList<ShortTaskStatus>(); + for (HostRoleCommand task : getCommands()) { + shortTasks.add(new ShortTaskStatus(task)); + } + requestStatus.setTasks(shortTasks); + //todo: null tasks? + + return requestStatus; + } + + public Map<Long, HostRoleCommandStatusSummaryDTO> getStageSummaries() { + Map<Long, HostRoleCommandStatusSummaryDTO> summaryMap = new HashMap<Long, HostRoleCommandStatusSummaryDTO>(); + + for (StageEntity stage : getStageEntities()) { + //Number minStartTime = 0; + //Number maxEndTime = 0; + int aborted = 0; + int completed = 0; + int failed = 0; + int holding = 0; + int holdingFailed = 0; + int holdingTimedout = 0; + int inProgress = 0; + int pending = 0; + int queued = 0; + int timedout = 0; + + //todo: where does this logic belong? + for (HostRoleCommandEntity task : stage.getHostRoleCommands()) { + HostRoleStatus taskStatus = task.getStatus(); + + switch (taskStatus) { + case ABORTED: + aborted += 1; + break; + case COMPLETED: + completed += 1; + break; + case FAILED: + failed += 1; + break; + case HOLDING: + holding += 1; + break; + case HOLDING_FAILED: + holdingFailed += 1; + break; + case HOLDING_TIMEDOUT: + holdingTimedout += 1; + break; + case IN_PROGRESS: + inProgress += 1; + break; + case PENDING: + pending += 1; + break; + case QUEUED: + queued += 1; + break; + case TIMEDOUT: + timedout += 1; + break; + default: + //todo: proper log msg + System.out.println("Unexpected status when creating stage summaries: " + taskStatus); + } + } + + //todo: skippable. I only see a skippable field on the stage, not the tasks + //todo: time related fields + HostRoleCommandStatusSummaryDTO stageSummary = new HostRoleCommandStatusSummaryDTO(stage.isSkippable() ? 1 : 0, 0, 0, + stage.getStageId(), aborted, completed, failed, holding, holdingFailed, holdingTimedout, inProgress, pending, queued, timedout); + summaryMap.put(stage.getStageId(), stageSummary); + } + return summaryMap; + } + + //todo: context is a temporary refactoring step + private void createHostRequests(TopologyRequest requestRequest, TopologyManager.ClusterTopologyContext topologyContext) { + //todo: consistent stage ordering + //todo: confirm that stages don't need to be unique across requests + long stageIdCounter = 0; + Map<String, HostGroupInfo> hostGroupInfoMap = requestRequest.getHostGroupInfo(); + for (HostGroupInfo hostGroupInfo : hostGroupInfoMap.values()) { + String groupName = hostGroupInfo.getHostGroupName(); + Blueprint blueprint = topology.getBlueprint(); + int hostCardinality; + List<String> hostnames; + + hostCardinality = hostGroupInfo.getRequestedHostCount(); + hostnames = new ArrayList<String>(hostGroupInfo.getHostNames()); + + + for (int i = 0; i < hostCardinality; ++i) { + if (! hostnames.isEmpty()) { + // host names are specified + String hostname = hostnames.get(i); + //todo: pass in HostGroupInfo + HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(), + blueprint.getName(), blueprint.getHostGroup(groupName), hostname, hostGroupInfo.getPredicate(), + topologyContext); + synchronized (requestsWithReservedHosts) { + requestsWithReservedHosts.put(hostname, hostRequest); + } + } else { + // host count is specified + //todo: pass in HostGroupInfo + HostRequest hostRequest = new HostRequest(getRequestId(), stageIdCounter++, getClusterName(), + blueprint.getName(), blueprint.getHostGroup(groupName), hostCardinality, hostGroupInfo.getPredicate(), + topologyContext); + outstandingHostRequests.add(hostRequest); + } + } + } + + allHostRequests.addAll(outstandingHostRequests); + allHostRequests.addAll(requestsWithReservedHosts.values()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java new file mode 100644 index 0000000..5ce2532 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchBlueprintException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distribut + * ed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +/** + * The requested blueprint doesn't exist + */ +public class NoSuchBlueprintException extends Exception { + public NoSuchBlueprintException(String name) { + super(String.format("No blueprint exists with the name '%s'", name)); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java new file mode 100644 index 0000000..413cb4e --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/NoSuchHostGroupException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distribut + * ed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +/** + * Created with IntelliJ IDEA. + * User: john + * Date: 4/3/15 + * Time: 3:59 PM + * To change this template use File | Settings | File Templates. + */ +public class NoSuchHostGroupException extends Exception { + public NoSuchHostGroupException(String hostgroupName) { + super("Requested HostGroup doesn't exist: " + hostgroupName); + } + + public NoSuchHostGroupException(String hostgroupName, String msg) { + super(msg + ". Cause: Requested HostGroup doesn't exist: " + hostgroupName); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java new file mode 100644 index 0000000..870d718 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/RequiredPasswordValidator.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distribut + * ed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import org.apache.ambari.server.controller.internal.Stack; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * Validates that all required passwords are provided. + */ +public class RequiredPasswordValidator implements TopologyValidator { + + private String defaultPassword; + + public RequiredPasswordValidator(String defaultPassword) { + this.defaultPassword = defaultPassword; + } + + /** + * Validate that all required password properties have been set or that 'default_password' is specified. + * + * @throws InvalidTopologyException if required password properties are missing and no + * default is specified via 'default_password' + */ + public void validate(ClusterTopology topology) throws InvalidTopologyException { + Map<String, Map<String, Collection<String>>> missingPasswords = validateRequiredPasswords(topology); + + if (! missingPasswords.isEmpty()) { + throw new InvalidTopologyException("Missing required password properties. Specify a value for these " + + "properties in the cluster or host group configurations or include 'default_password' field in request. " + + missingPasswords); + } + } + + /** + * Validate all configurations. Validation is done on the operational configuration of each + * host group. An operational configuration is achieved by overlaying host group configuration + * on top of cluster configuration which overlays the default stack configurations. + * + * @return map of required properties which are missing. Empty map if none are missing. + * + * @throws IllegalArgumentException if blueprint contains invalid information + */ + + //todo: this is copied/pasted from Blueprint and is currently only used by validatePasswordProperties() + //todo: seems that we should have some common place for this code so it can be used by BP and here? + private Map<String, Map<String, Collection<String>>> validateRequiredPasswords(ClusterTopology topology) { + + Map<String, Map<String, Collection<String>>> missingProperties = + new HashMap<String, Map<String, Collection<String>>>(); + + for (Map.Entry<String, HostGroupInfo> groupEntry: topology.getHostGroupInfo().entrySet()) { + String hostGroupName = groupEntry.getKey(); + Map<String, Map<String, String>> groupProperties = + groupEntry.getValue().getConfiguration().getFullProperties(3); + + Collection<String> processedServices = new HashSet<String>(); + Blueprint blueprint = topology.getBlueprint(); + Stack stack = blueprint.getStack(); + + HostGroup hostGroup = blueprint.getHostGroup(hostGroupName); + for (String component : hostGroup.getComponents()) { + //for now, AMBARI is not recognized as a service in Stacks + if (component.equals("AMBARI_SERVER")) { + continue; + } + + String serviceName = stack.getServiceForComponent(component); + if (processedServices.add(serviceName)) { + //todo: do I need to subtract excluded configs? + Collection<Stack.ConfigProperty> requiredProperties = + stack.getRequiredConfigurationProperties(serviceName, "PASSWORD"); + + for (Stack.ConfigProperty property : requiredProperties) { + String category = property.getType(); + String name = property.getName(); + if (! propertyExists(topology, groupProperties, category, name)) { + Map<String, Collection<String>> missingHostGroupPropsMap = missingProperties.get(hostGroupName); + if (missingHostGroupPropsMap == null) { + missingHostGroupPropsMap = new HashMap<String, Collection<String>>(); + missingProperties.put(hostGroupName, missingHostGroupPropsMap); + } + Collection<String> missingHostGroupTypeProps = missingHostGroupPropsMap.get(category); + if (missingHostGroupTypeProps == null) { + missingHostGroupTypeProps = new HashSet<String>(); + missingHostGroupPropsMap.put(category, missingHostGroupTypeProps); + } + missingHostGroupTypeProps.add(name); + } + } + } + } + } + return missingProperties; + } + + private boolean propertyExists(ClusterTopology topology, Map<String, Map<String, String>> props, String type, String property) { + Map<String, String> typeProps = props.get(type); + return (typeProps != null && typeProps.containsKey(property)) || setDefaultPassword(topology, type, property); + } + + /** + * Attempt to set the default password in cluster configuration for missing password property. + * + * @param configType configuration type + * @param property password property name + * + * @return true if password was set, otherwise false. Currently the password will always be set + * unless it is null + */ + private boolean setDefaultPassword(ClusterTopology topology, String configType, String property) { + boolean setDefaultPassword = false; + if (defaultPassword != null && ! defaultPassword.trim().isEmpty()) { + topology.getConfiguration().setProperty(configType, property, defaultPassword); + setDefaultPassword = true; + } + return setDefaultPassword; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RequiredPasswordValidator that = (RequiredPasswordValidator) o; + + return defaultPassword == null ? that.defaultPassword == null : defaultPassword.equals(that.defaultPassword); + } + + @Override + public int hashCode() { + return defaultPassword != null ? defaultPassword.hashCode() : 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java new file mode 100644 index 0000000..3e1b565 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -0,0 +1,610 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distribut + * ed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import com.google.inject.Singleton; +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.Request; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.AmbariServer; +import org.apache.ambari.server.controller.ClusterRequest; +import org.apache.ambari.server.controller.RequestStatusResponse; +import org.apache.ambari.server.controller.ServiceComponentRequest; +import org.apache.ambari.server.controller.ServiceRequest; +import org.apache.ambari.server.controller.internal.ComponentResourceProvider; +import org.apache.ambari.server.controller.internal.ServiceResourceProvider; +import org.apache.ambari.server.controller.internal.Stack; +import org.apache.ambari.server.controller.spi.Resource; +import org.apache.ambari.server.controller.utilities.ClusterControllerHelper; +import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO; +import org.apache.ambari.server.orm.entities.StageEntity; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.SecurityType; +import org.apache.ambari.server.state.host.HostImpl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manages all cluster provisioning actions on the cluster topology. + */ +//todo: cluster isolation +@Singleton +public class TopologyManager { + + private final List<HostImpl> availableHosts = new LinkedList<HostImpl>(); + private final Map<String, LogicalRequest> reservedHosts = new HashMap<String, LogicalRequest>(); + private final Map<Long, LogicalRequest> allRequests = new HashMap<Long, LogicalRequest>(); + // priority is given to oldest outstanding requests + private final Collection<LogicalRequest> outstandingRequests = new ArrayList<LogicalRequest>(); + private Map<String, ClusterTopology> clusterTopologyMap = new HashMap<String, ClusterTopology>(); + private final Map<TopologyTask.Type, Set<TopologyTask>> pendingTasks = new HashMap<TopologyTask.Type, Set<TopologyTask>>(); + + //todo: proper wait/notify mechanism + private final Object configurationFlagLock = new Object(); + private boolean configureComplete = false; + + private AmbariManagementController controller; + ExecutorService executor; + //todo: task id's. Use existing mechanism for getting next task id sequence + private final static AtomicLong nextTaskId = new AtomicLong(10000); + private final Object serviceResourceLock = new Object(); + + + public TopologyManager() { + pendingTasks.put(TopologyTask.Type.CONFIGURE, new HashSet<TopologyTask>()); + pendingTasks.put(TopologyTask.Type.INSTALL, new HashSet<TopologyTask>()); + pendingTasks.put(TopologyTask.Type.START, new HashSet<TopologyTask>()); + + executor = getExecutorService(); + } + + public RequestStatusResponse provisionCluster(TopologyRequest request) throws InvalidTopologyException, AmbariException { + ClusterTopology topology = new ClusterTopologyImpl(request); + + String clusterName = topology.getClusterName(); + clusterTopologyMap.put(clusterName, topology); + + createClusterResource(clusterName); + createServiceAndComponentResources(topology); + + LogicalRequest logicalRequest = processRequest(request, topology); + try { + addClusterConfigRequest(new ClusterConfigurationRequest(topology)); + } catch (AmbariException e) { + //todo + throw e; + } + + //todo: this should be invoked as part of a generic lifecycle event which could possibly + //todo: be tied to cluster state + persistInstallStateForUI(clusterName); + return getRequestStatus(logicalRequest.getRequestId()); + } + + public RequestStatusResponse scaleHosts(TopologyRequest request) + throws InvalidTopologyException, AmbariException { + + String clusterName = request.getClusterName(); + ClusterTopology topology = clusterTopologyMap.get(clusterName); + if (topology == null) { + throw new AmbariException("TopologyManager: Unable to retrieve cluster topology for cluster: " + clusterName); + } + + // this registers/updates all request host groups + topology.update(request); + return getRequestStatus(processRequest(request, topology).getRequestId()); + } + + //todo: should be synchronized on same lock as onHostRegistered() + //todo: HostImpl is what is registered with the HearbeatHandler and contains more host info than HostInfo so + //todo: we should probably change to use HostImpl + public void onHostRegistered(HostImpl host, boolean associatedWithCluster) { + if (associatedWithCluster) { + return; + } + + boolean matchedToRequest = false; + String hostName = host.getHostName(); + synchronized(reservedHosts) { + if (reservedHosts.containsKey(hostName)) { + LogicalRequest request = reservedHosts.remove(hostName); + HostOfferResponse response = request.offer(host); + if (response.getAnswer() != HostOfferResponse.Answer.ACCEPTED) { + //todo: this is handled explicitly in LogicalRequest so this shouldn't happen here + throw new RuntimeException("LogicalRequest declined host offer of explicitly requested host: " + hostName); + } + processAcceptedHostOffer(getClusterTopology(request.getClusterName()), response, host); + matchedToRequest = true; + } + } + + // can be true if host was reserved + if (! matchedToRequest) { + synchronized (outstandingRequests) { + Iterator<LogicalRequest> outstandingRequestIterator = outstandingRequests.iterator(); + while (! matchedToRequest && outstandingRequestIterator.hasNext()) { + LogicalRequest request = outstandingRequestIterator.next(); + HostOfferResponse hostOfferResponse = request.offer(host); + switch (hostOfferResponse.getAnswer()) { + case ACCEPTED: + matchedToRequest = true; + processAcceptedHostOffer(getClusterTopology(request.getClusterName()), hostOfferResponse, host); + break; + case DECLINED_DONE: + outstandingRequestIterator.remove(); + break; + case DECLINED_PREDICATE: + break; + } + } + } + } + + if (! matchedToRequest) { + synchronized (availableHosts) { + System.out.printf("TopologyManager: Queueing available host %s\n", hostName); + availableHosts.add(host); + } + } + } + + public void onHostLeft(String hostname) { + //todo: + } + + public Request getRequest(long requestId) { + return allRequests.get(requestId); + } + + public Collection<LogicalRequest> getRequests(Collection<Long> requestIds) { + if (requestIds.isEmpty()) { + return allRequests.values(); + } else { + Collection<LogicalRequest> matchingRequests = new ArrayList<LogicalRequest>(); + for (long id : requestIds) { + LogicalRequest request = allRequests.get(id); + if (request != null) { + matchingRequests.add(request); + } + } + return matchingRequests; + } + } + + //todo: currently we are just returning all stages for all requests + //todo: and relying on the StageResourceProvider to convert each to a resource and do a predicate eval on each + public Collection<StageEntity> getStages() { + Collection<StageEntity> stages = new ArrayList<StageEntity>(); + for (LogicalRequest logicalRequest : allRequests.values()) { + stages.addAll(logicalRequest.getStageEntities()); + } + return stages; + } + + public Collection<HostRoleCommand> getTasks(long requestId) { + LogicalRequest request = allRequests.get(requestId); + return request == null ? Collections.<HostRoleCommand>emptyList() : request.getCommands(); + } + + public Collection<HostRoleCommand> getTasks(Collection<Long> requestIds) { + Collection<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>(); + for (long id : requestIds) { + tasks.addAll(getTasks(id)); + } + + return tasks; + } + + public Map<Long, HostRoleCommandStatusSummaryDTO> getStageSummaries(Long requestId) { + LogicalRequest request = allRequests.get(requestId); + return request == null ? Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap() : + request.getStageSummaries(); + } + + public RequestStatusResponse getRequestStatus(long requestId) { + LogicalRequest request = allRequests.get(requestId); + return request == null ? null : request.getRequestStatus(); + } + + public Collection<RequestStatusResponse> getRequestStatus(Collection<Long> ids) { + List<RequestStatusResponse> requestStatusResponses = new ArrayList<RequestStatusResponse>(); + for (long id : ids) { + RequestStatusResponse response = getRequestStatus(id); + if (response != null) { + requestStatusResponses.add(response); + } + } + + return requestStatusResponses; + } + + public ClusterTopology getClusterTopology(String clusterName) { + return clusterTopologyMap.get(clusterName); + } + + public Map<String, Collection<String>> getProjectedTopology() { + Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>(); + + for (LogicalRequest logicalRequest : allRequests.values()) { + Map<String, Collection<String>> requestTopology = logicalRequest.getProjectedTopology(); + for (Map.Entry<String, Collection<String>> entry : requestTopology.entrySet()) { + String host = entry.getKey(); + Collection<String> hostComponents = hostComponentMap.get(host); + if (hostComponents == null) { + hostComponents = new HashSet<String>(); + hostComponentMap.put(host, hostComponents); + } + hostComponents.addAll(entry.getValue()); + } + } + return hostComponentMap; + } + + private LogicalRequest processRequest(TopologyRequest request, ClusterTopology topology) throws AmbariException { + + finalizeTopology(request, topology); + LogicalRequest logicalRequest = createLogicalRequest(request, topology); + + boolean requestHostComplete = false; + //todo: overall synchronization. Currently we have nested synchronization here + synchronized(availableHosts) { + Iterator<HostImpl> hostIterator = availableHosts.iterator(); + while (! requestHostComplete && hostIterator.hasNext()) { + HostImpl host = hostIterator.next(); + synchronized (reservedHosts) { + String hostname = host.getHostName(); + if (reservedHosts.containsKey(hostname)) { + if (logicalRequest.equals(reservedHosts.get(hostname))) { + // host is registered to this request, remove it from reserved map + reservedHosts.remove(hostname); + } else { + // host is registered with another request, don't offer + //todo: clean up logic + continue; + } + } + } + HostOfferResponse response = logicalRequest.offer(host); + switch (response.getAnswer()) { + case ACCEPTED: + //todo: when host matches last host it returns ACCEPTED so we don't know that logical request is no + //todo: longer outstanding until we call offer again. This is really only an issue if we need to + //todo: deal specifically with outstanding hosts other than calling offer. Also, failure handling + //todo: may affect this behavior?? + hostIterator.remove(); + processAcceptedHostOffer(getClusterTopology(logicalRequest.getClusterName()), response, host); + break; + case DECLINED_DONE: + requestHostComplete = true; + break; + case DECLINED_PREDICATE: + break; + } + } + + if (! requestHostComplete) { + // not all required hosts have been matched (see earlier comment regarding outstanding logical requests + outstandingRequests.add(logicalRequest); + } + } + return logicalRequest; + } + + private LogicalRequest createLogicalRequest(TopologyRequest request, ClusterTopology topology) throws AmbariException { + LogicalRequest logicalRequest = new LogicalRequest(request, new ClusterTopologyContext(topology)); + allRequests.put(logicalRequest.getRequestId(), logicalRequest); + synchronized (reservedHosts) { + for (String host : logicalRequest.getReservedHosts()) { + reservedHosts.put(host, logicalRequest); + } + } + + return logicalRequest; + } + + private void processAcceptedHostOffer(ClusterTopology topology, HostOfferResponse response, HostImpl host) { + try { + topology.addHostToTopology(response.getHostGroupName(), host.getHostName()); + } catch (InvalidTopologyException e) { + //todo + throw new RuntimeException(e); + } catch (NoSuchHostGroupException e) { + throw new RuntimeException(e); + } + + List<TopologyTask> tasks = response.getTasks(); + synchronized (configurationFlagLock) { + if (configureComplete) { + for (TopologyTask task : tasks) { + task.run(); + } + }else { + for (TopologyTask task : tasks) { + //todo: proper state dependencies + TopologyTask.Type taskType = task.getType(); + if (taskType == TopologyTask.Type.RESOURCE_CREATION || taskType == TopologyTask.Type.CONFIGURE) { + task.run(); + } else { + // all type collections are added at init time + pendingTasks.get(taskType).add(task); + } + } + } + } + } + + //todo: this should invoke a callback on each 'service' in the topology + private void finalizeTopology(TopologyRequest request, ClusterTopology topology) { + addKerberosClientIfNecessary(topology); + } + + /** + * Add the kerberos client to groups if kerberos is enabled for the cluster. + * + * @param topology cluster topology + */ + //for now, hard coded here + private void addKerberosClientIfNecessary(ClusterTopology topology) { + + String clusterName = topology.getClusterName(); + //todo: logic would ideally be contained in the stack + Cluster cluster; + try { + cluster = getController().getClusters().getCluster(clusterName); + } catch (AmbariException e) { + //todo: this shouldn't happen at this point but still need to handle in a generic manner for topo finalization + throw new RuntimeException("Parent Cluster resource doesn't exist. clusterName= " + clusterName); + } + if (cluster.getSecurityType() == SecurityType.KERBEROS) { + for (HostGroup group : topology.getBlueprint().getHostGroups().values()) { + group.addComponent("KERBEROS_CLIENT"); + } + } + } + + // create a thread pool which is used for task execution + private synchronized ExecutorService getExecutorService() { + if (executor == null) { + LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); + + int THREAD_POOL_CORE_SIZE = 2; + int THREAD_POOL_MAX_SIZE = 100; + int THREAD_POOL_TIMEOUT = Integer.MAX_VALUE; + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + THREAD_POOL_CORE_SIZE, + THREAD_POOL_MAX_SIZE, + THREAD_POOL_TIMEOUT, + TimeUnit.SECONDS, + queue); + + //threadPoolExecutor.allowCoreThreadTimeOut(true); + executor = threadPoolExecutor; + } + return executor; + } + + private void addClusterConfigRequest(ClusterConfigurationRequest configurationRequest) { + //pendingTasks.get(Action.CONFIGURE).add(new ConfigureClusterTask(configurationRequest)); + synchronized (configurationFlagLock) { + configureComplete = false; + } + executor.submit(new ConfigureClusterTask(configurationRequest)); + } + + private void createClusterResource(String clusterName) throws AmbariException { + Stack stack = clusterTopologyMap.get(clusterName).getBlueprint().getStack(); + String stackInfo = String.format("%s-%s", stack.getName(), stack.getVersion()); + ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, stackInfo, null); + getController().createCluster(clusterRequest); + } + + private void createServiceAndComponentResources(ClusterTopology topology) { + String clusterName = topology.getClusterName(); + Collection<String> services = topology.getBlueprint().getServices(); + + synchronized(serviceResourceLock) { + try { + Cluster cluster = getController().getClusters().getCluster(clusterName); + services.removeAll(cluster.getServices().keySet()); + } catch (AmbariException e) { + //todo + throw new RuntimeException(e); + } + Set<ServiceRequest> serviceRequests = new HashSet<ServiceRequest>(); + Set<ServiceComponentRequest> componentRequests = new HashSet<ServiceComponentRequest>(); + for (String service : services) { + serviceRequests.add(new ServiceRequest(clusterName, service, null)); + for (String component : topology.getBlueprint().getComponents(service)) { + componentRequests.add(new ServiceComponentRequest(clusterName, service, component, null)); + } + } + try { + ServiceResourceProvider serviceResourceProvider = (ServiceResourceProvider) ClusterControllerHelper. + getClusterController().ensureResourceProvider(Resource.Type.Service); + + serviceResourceProvider.createServices(serviceRequests); + + ComponentResourceProvider componentResourceProvider = (ComponentResourceProvider) ClusterControllerHelper. + getClusterController().ensureResourceProvider(Resource.Type.Component); + + componentResourceProvider.createComponents(componentRequests); + } catch (AmbariException e) { + //todo + throw new RuntimeException(e); + } + } + } + + /** + * Persist cluster state for the ambari UI. Setting this state informs that UI that a cluster has been + * installed and started and that the monitoring screen for the cluster should be displayed to the user. + * + * @param clusterName name of cluster + */ + //todo: invoke as part of a generic callback possible associated with cluster state + private void persistInstallStateForUI(String clusterName) throws AmbariException { + Stack stack = clusterTopologyMap.get(clusterName).getBlueprint().getStack(); + String stackInfo = String.format("%s-%s", stack.getName(), stack.getVersion()); + ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, "INSTALLED", null, stackInfo, null); + + getController().updateClusters(Collections.singleton(clusterRequest), null); + } + + private synchronized AmbariManagementController getController() { + if (controller == null) { + controller = AmbariServer.getController(); + } + return controller; + } + + private class ConfigureClusterTask implements Runnable { + private ClusterConfigurationRequest configRequest; + + + public ConfigureClusterTask(ClusterConfigurationRequest configRequest) { + this.configRequest = configRequest; + } + + + @Override + public void run() { + System.out.println("TopologyManager.ConfigureClusterTask: Entering"); + + boolean completed = false; + boolean interrupted = false; + + while (! completed && ! interrupted) { + completed = areConfigsResolved(); + + try { + Thread.sleep(200); + } catch (InterruptedException e) { + interrupted = true; + // reset interrupted flag on thread + Thread.interrupted(); + + } + } + + if (! interrupted) { + try { + System.out.println("TopologyManager.ConfigureClusterTask: Setting Configuration on cluster"); + // sets updated configuration on topology and cluster + configRequest.process(); + } catch (Exception e) { + //todo: how to handle this? If this fails, we shouldn't start any hosts. + System.out.println("TopologyManager.ConfigureClusterTask: " + + "An exception occurred while attempting to process cluster configs and set on cluster"); + e.printStackTrace(); + } + + synchronized (configurationFlagLock) { + System.out.println("TopologyManager.ConfigureClusterTask: Setting configure complete flag to true"); + configureComplete = true; + } + + // execute all queued install/start tasks + executor.submit(new ExecuteQueuedHostTasks()); + } + System.out.println("TopologyManager.ConfigureClusterTask: Exiting"); + } + + // get set of required host groups from config processor and confirm that all requests + // have fully resolved the host names for the required host groups + private boolean areConfigsResolved() { + boolean configTopologyResolved = true; + Collection<String> requiredHostGroups; + try { + requiredHostGroups = configRequest.getRequiredHostGroups(); + } catch (RuntimeException e) { + //todo + System.out.println("Caught an error from Config Processor: " + e); + throw e; + } + + synchronized (outstandingRequests) { + for (LogicalRequest outstandingRequest : outstandingRequests) { + if (! outstandingRequest.areGroupsResolved(requiredHostGroups)) { + configTopologyResolved = false; + break; + } + } + } + return configTopologyResolved; + } + } + + private class ExecuteQueuedHostTasks implements Runnable { + @Override + public void run() { + //todo: lock is too coarse grained, should only be on start tasks + synchronized(pendingTasks) { + // execute queued install tasks + //todo: once agent configuration is removed from agent install, we will be able to + //todo: install without regard to configuration resolution + Iterator<TopologyTask> iter = pendingTasks.get(TopologyTask.Type.INSTALL).iterator(); + while (iter.hasNext()) { + iter.next().run(); + iter.remove(); + } + + iter = pendingTasks.get(TopologyTask.Type.START).iterator(); + while (iter.hasNext()) { + iter.next().run(); + iter.remove(); + } + } + } + } + + //todo: this is a temporary step, remove after refactoring makes it no longer needed + public class ClusterTopologyContext { + private ClusterTopology clusterTopology; + + public ClusterTopologyContext(ClusterTopology clusterTopology) { + this.clusterTopology = clusterTopology; + } + + public ClusterTopology getClusterTopology() { + return clusterTopology; + } + + public long getNextTaskId() { + synchronized (nextTaskId) { + return nextTaskId.getAndIncrement(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java new file mode 100644 index 0000000..4c1abf9 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import java.util.List; +import java.util.Map; + +/** + * A request which is used to create or modify a cluster topology. + */ +//todo: naming +public interface TopologyRequest { + + public String getClusterName(); + //todo: only a single BP may be specified so all host groups have the same bp. + //todo: There is no reason really that we couldn't allow hostgroups from different blueprints assuming that + //todo: the stack matches across the groups. For scaling operations, we allow different blueprints (rather arbitrary) + //todo: so BP really needs to get associated with the HostGroupInfo, even for create which will have a single BP + //todo: for all HG's. + public Blueprint getBlueprint(); + public Configuration getConfiguration(); + public Map<String, HostGroupInfo> getHostGroupInfo(); + public List<TopologyValidator> getTopologyValidators(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c9f0dd0b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java new file mode 100644 index 0000000..284fbba --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyRequestFactory.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distribut + * ed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import java.util.Map; + +/** + * Factory for creating topology requests. + */ +public interface TopologyRequestFactory { + public TopologyRequest createProvisionClusterRequest(Map<String, Object> properties) throws InvalidTopologyTemplateException; + // todo: use to create other request types +}