SLIDER-378. Slider AM restart- Logfolders and componentinstancedata not getting published
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/11f6f2b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/11f6f2b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/11f6f2b3 Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry Commit: 11f6f2b3738f8b8c05be0324eec375e270ce1b3c Parents: ae4c076 Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Fri Aug 29 23:42:21 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Fri Aug 29 23:42:21 2014 -0700 ---------------------------------------------------------------------- .../src/main/python/agent/Controller.py | 1 + .../python/agent/CustomServiceOrchestrator.py | 10 +- slider-agent/src/main/python/agent/Register.py | 5 +- .../src/test/python/agent/TestRegistration.py | 5 +- .../providers/agent/AgentProviderService.java | 134 +++++++++++-------- .../appmaster/web/rest/agent/Register.java | 13 ++ .../agent/TestAgentProviderService.java | 18 ++- 7 files changed, 123 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/11f6f2b3/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index a3fb90d..11db21c 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -122,6 +122,7 @@ class Controller(threading.Thread): self.componentActualState, self.componentExpectedState, self.actionQueue.customServiceOrchestrator.allocated_ports, + self.actionQueue.customServiceOrchestrator.log_folders, id)) logger.info("Registering with the server at " + self.registerUrl + " with data " + pprint.pformat(data)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/11f6f2b3/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py index 15f1664..dd8e9b9 100644 --- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py +++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py @@ -26,6 +26,7 @@ import sys import socket import posixpath import platform +import copy from AgentConfig import AgentConfig from AgentException import AgentException from PythonExecutor import PythonExecutor @@ -58,6 +59,7 @@ class CustomServiceOrchestrator(): self.public_fqdn = hostname.public_hostname() self.stored_command = {} self.allocated_ports = {} + self.log_folders = {} # Clean up old status command files if any try: os.unlink(self.status_commands_stdout) @@ -133,15 +135,17 @@ class CustomServiceOrchestrator(): } if Constants.EXIT_CODE in ret and ret[Constants.EXIT_CODE] == 0: - ret[Constants.ALLOCATED_PORTS] = allocated_ports - self.allocated_ports = allocated_ports + ret[Constants.ALLOCATED_PORTS] = copy.deepcopy(allocated_ports) + ## Generally all ports are allocated at once but just in case + self.allocated_ports.update(allocated_ports) # Irrespective of the outcome report the folder paths if command_name == 'INSTALL': - ret[Constants.FOLDERS] = { + self.log_folders = { Constants.AGENT_LOG_ROOT: self.config.getLogPath(), Constants.AGENT_WORK_ROOT: self.config.getWorkRootPath() } + ret[Constants.FOLDERS] = copy.deepcopy(self.log_folders) return ret http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/11f6f2b3/slider-agent/src/main/python/agent/Register.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Register.py b/slider-agent/src/main/python/agent/Register.py index b59154f..c8246c7 100644 --- a/slider-agent/src/main/python/agent/Register.py +++ b/slider-agent/src/main/python/agent/Register.py @@ -29,7 +29,7 @@ class Register: def __init__(self, config): self.config = config - def build(self, actualState, expectedState, allocated_ports, id='-1'): + def build(self, actualState, expectedState, allocated_ports, log_folders, id='-1'): timestamp = int(time.time() * 1000) version = self.read_agent_version() @@ -41,7 +41,8 @@ class Register: 'agentVersion': version, 'actualState': actualState, 'expectedState': expectedState, - 'allocatedPorts': allocated_ports + 'allocatedPorts': allocated_ports, + 'logFolders': log_folders } return register http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/11f6f2b3/slider-agent/src/test/python/agent/TestRegistration.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestRegistration.py b/slider-agent/src/test/python/agent/TestRegistration.py index 7b3c875..2c98978 100644 --- a/slider-agent/src/test/python/agent/TestRegistration.py +++ b/slider-agent/src/test/python/agent/TestRegistration.py @@ -39,7 +39,7 @@ class TestRegistration(TestCase): config.set('agent', 'current_ping_port', '33777') register = Register(config) - data = register.build(State.INIT, State.INIT, {}, 1) + data = register.build(State.INIT, State.INIT, {}, {}, 1) #print ("Register: " + pprint.pformat(data)) self.assertEquals(data['hostname'] != "", True, "hostname should not be empty") self.assertEquals(data['publicHostname'] != "", True, "publicHostname should not be empty") @@ -49,7 +49,8 @@ class TestRegistration(TestCase): self.assertEquals(data['actualState'], State.INIT, "actualState should not be empty") self.assertEquals(data['expectedState'], State.INIT, "expectedState should not be empty") self.assertEquals(data['allocatedPorts'], {}, "allocatedPorts should be empty") - self.assertEquals(len(data), 8) + self.assertEquals(data['logFolders'], {}, "allocated log should be empty") + self.assertEquals(len(data), 9) self.assertEquals(os.path.join(tmpdir, "app/definition"), config.getResolvedPath("app_pkg_dir")) self.assertEquals(os.path.join(tmpdir, "app/install"), config.getResolvedPath("app_install_dir")) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/11f6f2b3/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index be5b1ac..61866fb 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -92,7 +92,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -109,9 +108,9 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS; -/** This class implements the server-side logic for application deployment - * through Slider application package - **/ +/** + * This class implements the server-side logic for application deployment through Slider application package + */ public class AgentProviderService extends AbstractProviderService implements ProviderCore, AgentKeys, @@ -194,7 +193,7 @@ public class AgentProviderService extends AbstractProviderService implements // Reads the metainfo.xml in the application package and loads it private void buildMetainfo(AggregateConf instanceDefinition, - SliderFileSystem fileSystem) throws IOException, SliderException { + SliderFileSystem fileSystem) throws IOException, SliderException { String appDef = instanceDefinition.getAppConfOperations() .getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF); @@ -211,7 +210,7 @@ public class AgentProviderService extends AbstractProviderService implements "metainfo.xml is required in app package."); } commandOrder = new ComponentCommandOrder(metainfo.getApplication() - .getCommandOrder()); + .getCommandOrder()); defaultConfigs = initializeDefaultConfigs(fileSystem, appDef, metainfo); monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval()); monitor.start(); @@ -297,8 +296,8 @@ public class AgentProviderService extends AbstractProviderService implements getGlobalOptions().getOption(AgentKeys.AGENT_CONF, ""); if (org.apache.commons.lang.StringUtils.isNotEmpty(agentConf)) { LocalResource agentConfRes = fileSystem.createAmResource(fileSystem - .getFileSystem().resolvePath(new Path(agentConf)), - LocalResourceType.FILE); + .getFileSystem().resolvePath(new Path(agentConf)), + LocalResourceType.FILE); launcher.addLocalResource(AgentKeys.AGENT_CONFIG_FILE, agentConfRes); } @@ -361,23 +360,23 @@ public class AgentProviderService extends AbstractProviderService implements @Override public void rebuildContainerDetails(List<Container> liveContainers, - String applicationId, Map<Integer, ProviderRole> providerRoleMap) { + String applicationId, Map<Integer, ProviderRole> providerRoleMap) { for (Container container : liveContainers) { // get the role name and label ProviderRole role = providerRoleMap.get(ContainerPriority - .extractRole(container)); + .extractRole(container)); if (role != null) { String roleName = role.name; String label = getContainerLabel(container, roleName); log.info("Rebuilding in-memory: container {} in role {} in cluster {}", - container.getId(), roleName, applicationId); + container.getId(), roleName, applicationId); getComponentStatuses().put( label, new ComponentInstanceState(roleName, container.getId(), - applicationId)); + applicationId)); } else { log.warn("Role not found for container {} in cluster {}", - container.getId(), applicationId); + container.getId(), applicationId); } } } @@ -411,7 +410,9 @@ public class AgentProviderService extends AbstractProviderService implements /** * Handle registration calls from the agents + * * @param registration + * * @return */ @Override @@ -426,11 +427,17 @@ public class AgentProviderService extends AbstractProviderService implements componentStatus.heartbeat(System.currentTimeMillis()); updateComponentStatusWithAgentState(componentStatus, agentState); + String roleName = getRoleName(label); + String containerId = getContainerId(label); + String hostFqdn = registration.getPublicHostname(); Map<String, String> ports = registration.getAllocatedPorts(); if (ports != null && !ports.isEmpty()) { - String roleName = getRoleName(label); - String containerId = getContainerId(label); - processAllocatedPorts(registration.getPublicHostname(), roleName, containerId, ports); + processAllocatedPorts(hostFqdn, roleName, containerId, ports); + } + + Map<String, String> folders = registration.getLogFolders(); + if (folders != null && folders.size() > 0) { + publishLogFolderPaths(folders, containerId, roleName, hostFqdn); } } else { response.setResponseStatus(RegistrationStatus.FAILED); @@ -443,7 +450,9 @@ public class AgentProviderService extends AbstractProviderService implements /** * Handle heartbeat response from agents + * * @param heartBeat + * * @return */ @Override @@ -468,7 +477,7 @@ public class AgentProviderService extends AbstractProviderService implements String scriptPath = cmdScript.getScript(); long timeout = cmdScript.getTimeout(); - if(timeout == 0L) { + if (timeout == 0L) { timeout = 600L; } @@ -495,7 +504,7 @@ public class AgentProviderService extends AbstractProviderService implements log.info("Component operation. Status: {}", result); if (command == Command.INSTALL && report.getFolders() != null && report.getFolders().size() > 0) { - publishLogFolderPaths(report.getFolders(), containerId, heartBeat.getFqdn()); + publishLogFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn()); } } @@ -553,9 +562,9 @@ public class AgentProviderService extends AbstractProviderService implements } protected void processAllocatedPorts(String fqdn, - String roleName, - String containerId, - Map<String, String> ports) { + String roleName, + String containerId, + Map<String, String> ports) { RoleInstance instance; try { instance = getAmState().getOwnedContainer(containerId); @@ -569,22 +578,22 @@ public class AgentProviderService extends AbstractProviderService implements log.info("Recording allocated port for {} as {}", portname, portNo); this.getAllocatedPorts().put(portname, portNo); this.getAllocatedPorts(containerId).put(portname, portNo); - if (instance!=null) { - try { - instance.registerPortEndpoint(Integer.valueOf(portNo), portname, ""); - } catch (NumberFormatException e) { - log.warn("Failed to parse {}: {}", portNo, e); - } + if (instance != null) { + try { + instance.registerPortEndpoint(Integer.valueOf(portNo), portname, ""); + } catch (NumberFormatException e) { + log.warn("Failed to parse {}: {}", portNo, e); } + } } // component specific publishes processAndPublishComponentSpecificData(ports, containerId, fqdn, roleName); - + // and update registration entries if (instance != null) { queueAccess.put(new RegisterComponentInstance(instance.getId(), 0, - TimeUnit.MILLISECONDS)); + TimeUnit.MILLISECONDS)); } } @@ -653,6 +662,7 @@ public class AgentProviderService extends AbstractProviderService implements /** * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default. + * * @param instanceDefinition */ private void readAndSetHeartbeatMonitoringInterval(AggregateConf instanceDefinition) { @@ -661,7 +671,7 @@ public class AgentProviderService extends AbstractProviderService implements Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL)); try { setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval)); - }catch (NumberFormatException e) { + } catch (NumberFormatException e) { log.warn( "Bad value {} for {}. Defaulting to ", hbMonitorInterval, @@ -672,6 +682,7 @@ public class AgentProviderService extends AbstractProviderService implements /** * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default. + * * @param instanceDefinition */ private void initializeAgentDebugCommands(AggregateConf instanceDefinition) { @@ -703,10 +714,13 @@ public class AgentProviderService extends AbstractProviderService implements /** * Read all default configs + * * @param fileSystem * @param appDef * @param metainfo + * * @return + * * @throws IOException */ protected Map<String, DefaultConfig> initializeDefaultConfigs(SliderFileSystem fileSystem, @@ -748,6 +762,7 @@ public class AgentProviderService extends AbstractProviderService implements /** * Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site + * * @param name * @param description * @param entries @@ -763,6 +778,7 @@ public class AgentProviderService extends AbstractProviderService implements /** * Get a list of all hosts for all role/container per role + * * @return */ protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() { @@ -792,11 +808,10 @@ public class AgentProviderService extends AbstractProviderService implements } /** - * Lost heartbeat from the container - release it and ask for a replacement - * (async operation) - * @param label - * @param containerId + * Lost heartbeat from the container - release it and ask for a replacement (async operation) * + * @param label + * @param containerId */ protected void lostContainer( String label, @@ -818,17 +833,20 @@ public class AgentProviderService extends AbstractProviderService implements /** * Format the folder locations and publish in the registry service + * * @param folders * @param containerId * @param hostFqdn + * @param roleName */ - private void publishLogFolderPaths(Map<String, String> folders, String containerId, String hostFqdn) { + protected void publishLogFolderPaths( + Map<String, String> folders, String containerId, String roleName, String hostFqdn) { for (String key : folders.keySet()) { - workFolders.put(String.format("%s-%s-%s", hostFqdn, containerId, key), folders.get(key)); + workFolders.put(String.format("%s->%s->%s->%s", roleName, hostFqdn, key, containerId), folders.get(key)); } publishApplicationInstanceData(LOG_FOLDERS_TAG, LOG_FOLDERS_TAG, - (new HashMap<String, String>(this.workFolders)).entrySet()); + (new HashMap<String, String>(this.workFolders)).entrySet()); } @@ -940,16 +958,16 @@ public class AgentProviderService extends AbstractProviderService implements } private boolean canBeExported(String exportGroupName, String name, Set<String> appExports) { - return appExports.contains(String.format("%s-%s", exportGroupName, name)); + return appExports.contains(String.format("%s-%s", exportGroupName, name)); } protected Map<String, String> getCurrentExports(String groupName) { - if(!this.exportGroups.containsKey(groupName)) { - synchronized (this.exportGroups) { - if(!this.exportGroups.containsKey(groupName)) { - this.exportGroups.put(groupName, new ConcurrentHashMap<String, String>()); - } - } + if (!this.exportGroups.containsKey(groupName)) { + synchronized (this.exportGroups) { + if (!this.exportGroups.containsKey(groupName)) { + this.exportGroups.put(groupName, new ConcurrentHashMap<String, String>()); + } + } } return this.exportGroups.get(groupName); @@ -957,7 +975,7 @@ public class AgentProviderService extends AbstractProviderService implements private void publishModifiedExportGroups(Set<String> modifiedGroups) { synchronized (this.exportGroups) { - for(String groupName : modifiedGroups) { + for (String groupName : modifiedGroups) { publishApplicationInstanceData(groupName, groupName, this.exportGroups.get(groupName).entrySet()); } } @@ -1029,7 +1047,9 @@ public class AgentProviderService extends AbstractProviderService implements /** * Return Component based on name + * * @param roleName + * * @return */ protected Component getApplicationComponent(String roleName) { @@ -1110,6 +1130,7 @@ public class AgentProviderService extends AbstractProviderService implements /** * Can any master publish config explicitly, if not a random master is used + * * @return */ protected boolean canAnyMasterPublishConfig() { @@ -1143,10 +1164,12 @@ public class AgentProviderService extends AbstractProviderService implements /** * Add install command to the heartbeat response + * * @param roleName * @param containerId * @param response * @param scriptPath + * * @throws SliderException */ @VisibleForTesting @@ -1333,7 +1356,7 @@ public class AgentProviderService extends AbstractProviderService implements synchronized (this.allocatedPorts) { if (!this.allocatedPorts.containsKey(containerId)) { this.allocatedPorts.put(containerId, - new ConcurrentHashMap<String, String>()); + new ConcurrentHashMap<String, String>()); } } } @@ -1381,8 +1404,9 @@ public class AgentProviderService extends AbstractProviderService implements String configTypes = appConf.get(AgentKeys.SYSTEM_CONFIGS); if (configTypes != null && configTypes.length() > 0) { String[] configs = configTypes.split(","); - for(String config :configs) - configList.add(config.trim()); + for (String config : configs) { + configList.add(config.trim()); + } } return new ArrayList<String>(new HashSet<String>(configList)); @@ -1395,7 +1419,7 @@ public class AgentProviderService extends AbstractProviderService implements configList.add(GLOBAL_CONFIG_TAG); List<ConfigFile> configFiles = getMetainfo().getApplication().getConfigFiles(); - for(ConfigFile configFile : configFiles) { + for (ConfigFile configFile : configFiles) { log.info("Expecting config type {}.", configFile.getDictionaryName()); configList.add(configFile.getDictionaryName()); } @@ -1420,7 +1444,7 @@ public class AgentProviderService extends AbstractProviderService implements for (String key : config.keySet()) { String value = config.get(key); String lookupKey = configName + "." + key; - if(!value.contains(DO_NOT_PROPAGATE_TAG)) { + if (!value.contains(DO_NOT_PROPAGATE_TAG)) { // If the config property is shared then pass on the already allocated value // from any container if (this.getAllocatedPorts().containsKey(lookupKey)) { @@ -1435,12 +1459,12 @@ public class AgentProviderService extends AbstractProviderService implements } //apply defaults only if the key is not present and value is not empty - if(getDefaultConfigs().containsKey(configName)) { + if (getDefaultConfigs().containsKey(configName)) { log.info("Adding default configs for type {}.", configName); - for(PropertyInfo defaultConfigProp : getDefaultConfigs().get(configName).getPropertyInfos()) { - if(!config.containsKey(defaultConfigProp.getName())){ - if(!defaultConfigProp.getName().isEmpty() && - defaultConfigProp.getValue() != null && + for (PropertyInfo defaultConfigProp : getDefaultConfigs().get(configName).getPropertyInfos()) { + if (!config.containsKey(defaultConfigProp.getName())) { + if (!defaultConfigProp.getName().isEmpty() && + defaultConfigProp.getValue() != null && !defaultConfigProp.getValue().isEmpty()) { config.put(defaultConfigProp.getName(), defaultConfigProp.getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/11f6f2b3/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java index a44c3a4..70d639f 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java @@ -38,6 +38,7 @@ public class Register { private State actualState; private State expectedState; private Map<String, String> allocatedPorts; + private Map<String, String> logFolders; @JsonProperty("responseId") public int getResponseId() { @@ -133,6 +134,18 @@ public class Register { this.allocatedPorts = ports; } + /** @return the log folders, or <code>null</code> if none are present */ + @JsonProperty("logFolders") + public Map<String, String> getLogFolders() { + return logFolders; + } + + /** @param logFolders assigned log folders */ + @JsonProperty("logFolders") + public void setLogFolders(Map<String, String> logFolders) { + this.logFolders = logFolders; + } + @Override public String toString() { String ret = "responseId=" + responseId + "\n" + http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/11f6f2b3/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java index 428d767..16f5347 100644 --- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java @@ -310,6 +310,12 @@ public class TestAgentProviderService { anyString(), anyMap() ); + + doNothing().when(mockAps).publishLogFolderPaths(anyMap(), + anyString(), + anyString(), + anyString() + ); expect(access.isApplicationLive()).andReturn(true).anyTimes(); ClusterDescription desc = new ClusterDescription(); desc.setOption(OptionKeys.ZOOKEEPER_QUORUM, "host1:2181"); @@ -344,9 +350,12 @@ public class TestAgentProviderService { Register reg = new Register(); reg.setResponseId(0); reg.setHostname("mockcontainer_1___HBASE_MASTER"); - Map<String,String> ports = new HashMap(); + Map<String,String> ports = new HashMap<String, String>(); ports.put("a","100"); reg.setAllocatedPorts(ports); + Map<String, String> folders = new HashMap<String, String>(); + folders.put("F1", "F2"); + reg.setLogFolders(folders); RegistrationResponse resp = mockAps.handleRegistration(reg); Assert.assertEquals(0, resp.getResponseId()); Assert.assertEquals(RegistrationStatus.OK, resp.getResponseStatus()); @@ -358,6 +367,13 @@ public class TestAgentProviderService { anyMap() ); + Mockito.verify(mockAps, Mockito.times(1)).publishLogFolderPaths( + anyMap(), + anyString(), + anyString(), + anyString() + ); + HeartBeat hb = new HeartBeat(); hb.setResponseId(1); hb.setHostname("mockcontainer_1___HBASE_MASTER");