Repository: ambari Updated Branches: refs/heads/branch-3.0-perf b1d357ad1 -> c8aecb772
AMBARI-21199. Run status commands with real configurations and parameters information (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c8aecb77 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c8aecb77 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c8aecb77 Branch: refs/heads/branch-3.0-perf Commit: c8aecb7721b574f0cc4604abcf93456ca4de93d6 Parents: b1d357a Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Thu Jun 8 13:43:14 2017 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Thu Jun 8 13:43:14 2017 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 21 +- .../main/python/ambari_agent/ClusterCache.py | 16 +- .../python/ambari_agent/ClusterTopologyCache.py | 67 +++++- .../ambari_agent/ComponentStatusExecutor.py | 32 +-- .../ambari_agent/CustomServiceOrchestrator.py | 123 +++++++---- .../src/main/python/ambari_agent/FileCache.py | 4 +- .../python/ambari_agent/InitializerModule.py | 4 +- .../src/main/python/ambari_agent/Utils.py | 2 + .../ambari_agent/TestAgentStompResponses.py | 11 +- .../stomp/configurations_update.json | 21 ++ .../dummy_files/stomp/execution_commands.json | 2 +- .../stomp/metadata_after_registration.json | 220 ++++--------------- .../stomp/topology_add_component.json | 7 +- .../dummy_files/stomp/topology_add_host.json | 5 +- .../stomp/topology_cache_expected.json | 69 +++--- .../dummy_files/stomp/topology_create.json | 43 ++-- .../agent/stomp/AgentReportsController.java | 6 +- 17 files changed, 319 insertions(+), 334 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index dbd9f4c..4cef88b 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -27,12 +27,10 @@ import os import ambari_simplejson as json import time import signal -import copy from AgentException import AgentException from LiveStatus import LiveStatus from ActualConfigHandler import ActualConfigHandler -from CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle from ambari_commons.str_utils import split_on_chunks from resource_management.libraries.script import Script @@ -77,12 +75,11 @@ class ActionQueue(threading.Thread): self.commandQueue = Queue.Queue() self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = initializer_module.commandStatuses - self.configurations_cache = initializer_module.configurations_cache self.config = initializer_module.ambariConfig self.configTags = {} self.stop_event = initializer_module.stop_event self.tmpdir = self.config.get('agent', 'prefix') - self.customServiceOrchestrator = CustomServiceOrchestrator(self.config) + self.customServiceOrchestrator = initializer_module.customServiceOrchestrator self.parallel_execution = self.config.get_parallel_exec_option() if self.parallel_execution == 1: logger.info("Parallel execution is enabled, will execute agent commands in parallel") @@ -92,18 +89,16 @@ class ActionQueue(threading.Thread): for command in commands: if not command.has_key('serviceName'): command['serviceName'] = "null" + if command.has_key('clusterId'): + command['clusterId'] = "null" if not command.has_key('clusterName'): command['clusterName'] = 'null' - - if command.has_key('clusterId'): - cluster_id = command['clusterId'] - # TODO STOMP: what if has no configs yet? - if cluster_id != 'null': - command['configurations'] = dict(self.configurations_cache[str(cluster_id)]) + + logger.info("Adding " + command['commandType'] + " for role " + \ command['role'] + " for service " + \ - command['serviceName'] + " of cluster " + \ - command['clusterName'] + " to the queue.") + command['serviceName'] + " of cluster_id " + \ + command['clusterId'] + " to the queue.") if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND : self.backgroundCommandQueue.put(self.createCommandHandle(command)) else: @@ -170,7 +165,7 @@ class ActionQueue(threading.Thread): except: logger.exception("ActionQueue thread failed with exception:") raise - + logger.info("ActionQueue thread has successfully finished") def processBackgroundQueueSafeEmpty(self): http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/ClusterCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py index 8e91afe..a3b84ad 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py @@ -57,12 +57,12 @@ class ClusterCache(dict): with self.__file_lock: with open(self.__current_cache_json_file, 'r') as fp: cache_dict = json.load(fp) - + """ for cluster_id, cache in cache_dict.iteritems(): immutable_cache = Utils.make_immutable(cache) cache_dict[cluster_id] = immutable_cache - - super(ClusterCache, self).__init__(cache_dict) + """ + self.rewrite_cache(cache_dict) def get_cluster_ids(self): return self.keys() @@ -80,6 +80,9 @@ class ClusterCache(dict): for cache_id_to_delete in cache_ids_to_delete: del self[cache_id_to_delete] + self.on_cache_update() + self.persist_cache() + def rewrite_cluster_cache(self, cluster_id, cache): """ @@ -98,8 +101,6 @@ class ClusterCache(dict): with self._cache_lock: self[cluster_id] = immutable_cache - self.persist_cache() - def persist_cache(self): # ensure that our cache directory exists if not os.path.exists(self.cluster_cache_dir): @@ -113,6 +114,11 @@ class ClusterCache(dict): with self._cache_lock: return Utils.get_mutable_copy(self) + def on_cache_update(self): + """ + Call back function called then cache is updated + """ + pass def get_cache_name(self): raise NotImplemented() http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py index 5810e67..f138c57 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py @@ -18,7 +18,12 @@ See the License for the specific language governing permissions and limitations under the License. """ + +from ambari_agent import hostname from ambari_agent.ClusterCache import ClusterCache +from ambari_agent.Utils import ImmutableDictionary + +from collections import defaultdict import logging logger = logging.getLogger(__name__) @@ -30,32 +35,72 @@ class ClusterTopologyCache(ClusterCache): topology properties. """ - def __init__(self, cluster_cache_dir): + def __init__(self, cluster_cache_dir, config): """ Initializes the topology cache. :param cluster_cache_dir: :return: """ + self.hosts_to_id = ImmutableDictionary({}) + self.components_by_key = ImmutableDictionary({}) + self.hostname = hostname.hostname(config) + self.current_host_ids_to_cluster = {} super(ClusterTopologyCache, self).__init__(cluster_cache_dir) def get_cache_name(self): return 'topology' - @staticmethod - def find_host_by_id(host_dicts, host_id): + def on_cache_update(self): + hosts_to_id = defaultdict(lambda:{}) + components_by_key = defaultdict(lambda:{}) + + for cluster_id, cluster_topology in self.iteritems(): + for host_dict in cluster_topology.hosts: + hosts_to_id[cluster_id][host_dict.hostId] = host_dict + + if host_dict.hostName == self.hostname: + self.current_host_ids_to_cluster[cluster_id] = host_dict.hostId + + for component_dict in cluster_topology.components: + key = "{0}/{1}".format(component_dict.serviceName, component_dict.componentName) + components_by_key[cluster_id][key] = component_dict + + self.hosts_to_id = ImmutableDictionary(hosts_to_id) + self.components_by_key = ImmutableDictionary(components_by_key) + + def get_component_info_by_key(self, cluster_id, service_name, component_name): + """ + Find component by service_name and component_name in list of component dictionaries. + """ + key = "{0}/{1}".format(service_name, component_name) + + try: + return self.components_by_key[cluster_id][key] + except KeyError: + return None + + def get_host_info_by_id(self, cluster_id, host_id): """ Find host by id in list of host dictionaries. """ + try: + return self.hosts_to_id[cluster_id][host_id] + except KeyError: + return None + + def get_current_host_info(self, cluster_id): + current_host_id = self.current_host_ids_to_cluster[cluster_id] + return self.get_host_info_by_id(cluster_id, current_host_id) + + @staticmethod + def _find_host_by_id_in_dict(host_dicts, host_id): for host_dict in host_dicts: if host_dict['hostId'] == host_id: return host_dict return None @staticmethod - def find_component(component_dicts, service_name, component_name): - """ - Find component by service_name and component_name in list of component dictionaries. - """ + def _find_component_in_dict(component_dicts, service_name, component_name): for component_dict in component_dicts: if component_dict['serviceName'] == service_name and component_dict['componentName'] == component_name: return component_dict @@ -81,7 +126,7 @@ class ClusterTopologyCache(ClusterCache): if 'hosts' in cluster_updates_dict: hosts_mutable_list = mutable_dict[cluster_id]['hosts'] for host_updates_dict in cluster_updates_dict['hosts']: - host_mutable_dict = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, host_updates_dict['hostId']) + host_mutable_dict = ClusterTopologyCache._find_host_by_id_in_dict(hosts_mutable_list, host_updates_dict['hostId']) if host_mutable_dict is not None: host_mutable_dict.update(host_updates_dict) else: @@ -90,7 +135,7 @@ class ClusterTopologyCache(ClusterCache): if 'components' in cluster_updates_dict: components_mutable_list = mutable_dict[cluster_id]['components'] for component_updates_dict in cluster_updates_dict['components']: - component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName']) + component_mutable_dict = ClusterTopologyCache._find_component_in_dict(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName']) if component_mutable_dict is not None: component_updates_dict['hostIds'] += component_mutable_dict['hostIds'] component_updates_dict['hostIds'] = list(set(component_updates_dict['hostIds'])) @@ -121,7 +166,7 @@ class ClusterTopologyCache(ClusterCache): if 'hosts' in cluster_updates_dict: hosts_mutable_list = mutable_dict[cluster_id]['hosts'] for host_updates_dict in cluster_updates_dict['hosts']: - host_to_delete = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, host_updates_dict['hostId']) + host_to_delete = ClusterTopologyCache._find_host_by_id_in_dict(hosts_mutable_list, host_updates_dict['hostId']) if host_to_delete is not None: mutable_dict[cluster_id]['hosts'] = [host_dict for host_dict in hosts_mutable_list if host_dict != host_to_delete] else: @@ -130,7 +175,7 @@ class ClusterTopologyCache(ClusterCache): if 'components' in cluster_updates_dict: components_mutable_list = mutable_dict[cluster_id]['components'] for component_updates_dict in cluster_updates_dict['components']: - component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName']) + component_mutable_dict = ClusterTopologyCache._find_component_in_dict(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName']) if 'hostIds' in component_mutable_dict: exclude_host_ids = component_updates_dict['hostIds'] component_mutable_dict['hostIds'] = [host_id for host_id in component_mutable_dict['hostIds'] if host_id not in exclude_host_ids] http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index 3a2e105..520c97d 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -18,7 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import random import logging import threading @@ -32,6 +31,7 @@ class ComponentStatusExecutor(threading.Thread): self.initializer_module = initializer_module self.metadata_cache = initializer_module.metadata_cache self.topology_cache = initializer_module.topology_cache + self.customServiceOrchestrator = initializer_module.customServiceOrchestrator self.stop_event = initializer_module.stop_event self.reported_component_status = defaultdict(lambda:defaultdict(lambda:None)) # component statuses which were received by server threading.Thread.__init__(self) @@ -53,17 +53,14 @@ class ComponentStatusExecutor(threading.Thread): # multithreading: if cluster was deleted during iteration continue - #if 'status_commands_to_run' in cluster_metadata: - # continue - - #status_commands_to_run = cluster_metadata.status_commands_to_run + if not 'status_commands_to_run' in metadata_cache: + continue - # TODO STOMP: read this from metadata - status_commands_to_run = ['STATUS', 'SECURITY_STATUS'] + status_commands_to_run = metadata_cache.status_commands_to_run cluster_components = topology_cache.components for component_dict in cluster_components: - for command in status_commands_to_run: + for command_name in status_commands_to_run: if self.stop_event.is_set(): break @@ -71,19 +68,26 @@ class ComponentStatusExecutor(threading.Thread): service_name = component_dict.serviceName component_name = component_dict.componentName - # TODO STOMP: run real command - logger.info("Running {0}/{1}".format(component_dict.statusCommandParams.service_package_folder, component_dict.statusCommandParams.script)) - #self.customServiceOrchestrator.requestComponentStatus(command) - status = random.choice(["INSTALLED","STARTED"]) + command_dict = { + 'serviceName': service_name, + 'role': component_name, + 'clusterId': cluster_id, + 'commandType': 'STATUS_COMMAND', + } + + component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict) + logger.info(component_status_result) + status = "STARTED" if component_status_result['exitcode'] == 0 else "INSTALLED" + result = { 'serviceName': service_name, 'componentName': component_name, - 'command': command, + 'command': command_name, 'status': status, 'clusterId': cluster_id, } - if status != self.reported_component_status[component_name][command]: + if status != self.reported_component_status[component_name][command_name]: logging.info("Status for {0} has changed to {1}".format(component_name, status)) cluster_reports[cluster_id].append(result) http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index 656e9a1..2350504 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -24,11 +24,11 @@ import ambari_simplejson as json import sys from ambari_commons import shell import threading +import copy from FileCache import FileCache from AgentException import AgentException from PythonExecutor import PythonExecutor -from PythonReflectiveExecutor import PythonReflectiveExecutor from resource_management.libraries.functions.log_process_information import log_process_information from resource_management.core.utils import PasswordString import subprocess @@ -64,7 +64,6 @@ class CustomServiceOrchestrator(): FREQUENT_COMMANDS = [COMMAND_NAME_STATUS] DONT_DEBUG_FAILURES_FOR_COMMANDS = FREQUENT_COMMANDS - REFLECTIVELY_RUN_COMMANDS = FREQUENT_COMMANDS # -- commands which run a lot and often (this increases their speed) DONT_BACKUP_LOGS_FOR_COMMANDS = FREQUENT_COMMANDS # Path where hadoop credential JARS will be available @@ -78,27 +77,30 @@ class CustomServiceOrchestrator(): # Property name for credential store class path CREDENTIAL_STORE_CLASS_PATH_NAME = 'credentialStoreClassPath' - def __init__(self, config): - self.config = config - self.tmp_dir = config.get('agent', 'prefix') - self.force_https_protocol = config.get_force_https_protocol() + def __init__(self, initializer_module): + self.metadata_cache = initializer_module.metadata_cache + self.topology_cache = initializer_module.topology_cache + self.configurations_cache = initializer_module.configurations_cache + self.config = initializer_module.ambariConfig + self.tmp_dir = self.config.get('agent', 'prefix') + self.force_https_protocol = self.config.get_force_https_protocol() self.exec_tmp_dir = Constants.AGENT_TMP_DIR - self.file_cache = FileCache(config) + self.file_cache = FileCache(self.config) self.status_commands_stdout = os.path.join(self.tmp_dir, 'status_command_stdout.txt') self.status_commands_stderr = os.path.join(self.tmp_dir, 'status_command_stderr.txt') - self.public_fqdn = hostname.public_hostname(config) + self.public_fqdn = hostname.public_hostname(self.config) # TODO STOMP: cache reset should be called on every agent registration #controller.registration_listeners.append(self.file_cache.reset) # Construct the hadoop credential lib JARs path - self.credential_shell_lib_path = os.path.join(config.get('security', 'credential_lib_dir', + self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir', self.DEFAULT_CREDENTIAL_SHELL_LIB_PATH), '*') - self.credential_conf_dir = config.get('security', 'credential_conf_dir', self.DEFAULT_CREDENTIAL_CONF_DIR) + self.credential_conf_dir = self.config.get('security', 'credential_conf_dir', self.DEFAULT_CREDENTIAL_CONF_DIR) - self.credential_shell_cmd = config.get('security', 'credential_shell_cmd', self.DEFAULT_CREDENTIAL_SHELL_CMD) + self.credential_shell_cmd = self.config.get('security', 'credential_shell_cmd', self.DEFAULT_CREDENTIAL_SHELL_CMD) # Clean up old status command files if any try: @@ -124,7 +126,7 @@ class CustomServiceOrchestrator(): .format(tid=str(task_id), reason=reason, pid=pid)) log_process_information(logger) shell.kill_process_with_children(pid) - else: + else: logger.warn("Unable to find process associated with taskId = %s" % task_id) def get_py_executor(self, forced_command_name): @@ -132,10 +134,7 @@ class CustomServiceOrchestrator(): Wrapper for unit testing :return: """ - if forced_command_name in self.REFLECTIVELY_RUN_COMMANDS: - return PythonReflectiveExecutor(self.tmp_dir, self.config) - else: - return PythonExecutor(self.tmp_dir, self.config) + return PythonExecutor(self.tmp_dir, self.config) def getProviderDirectory(self, service_name): """ @@ -245,7 +244,7 @@ class CustomServiceOrchestrator(): config.pop(value_name, None) return configtype_credentials - def generateJceks(self, commandJson): + def qJceks(self, commandJson): """ Generates the JCEKS file with passwords for the service specified in commandJson @@ -302,25 +301,26 @@ class CustomServiceOrchestrator(): return cmd_result - def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None, - override_output_files=True, retry=False): + def runCommand(self, command_header, tmpoutfile, tmperrfile, forced_command_name=None, + override_output_files=True, retry=False, is_status_command=False): """ forced_command_name may be specified manually. In this case, value, defined at command json, is ignored. """ try: - script_type = command['commandParams']['script_type'] - script = command['commandParams']['script'] - timeout = int(command['commandParams']['command_timeout']) + command = self.generate_command(command_header) + script_type = command['script_type'] # TODO STOMP: take this from command? + script = command['componentLevelParams']['script'] + timeout = int('300') # TODO STOMP: fix it - if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']: - server_url_prefix = command['hostLevelParams']['jdk_location'] - else: - server_url_prefix = command['commandParams']['jdk_location'] + server_url_prefix = command['clusterLevelParams']['jdk_location'] # Status commands have no taskId nor roleCommand - task_id = command['taskId'] if 'taskId' in command else 'status' - command_name = command['roleCommand'] if 'roleCommand' in command else None + if not is_status_command: + task_id = command['taskId'] + command_name = command['roleCommand'] + else: + task_id = 'status' if forced_command_name is not None: # If not supplied as an argument command_name = forced_command_name @@ -335,7 +335,7 @@ class CustomServiceOrchestrator(): # forces a hash challenge on the directories to keep them updated, even # if the return type is not used - self.file_cache.get_host_scripts_base_dir(server_url_prefix) + self.file_cache.get_host_scripts_base_dir(server_url_prefix) hook_dir = self.file_cache.get_hook_base_dir(command, server_url_prefix) base_dir = self.file_cache.get_service_base_dir(command, server_url_prefix) self.file_cache.get_custom_resources_subdir(command, server_url_prefix) @@ -361,10 +361,11 @@ class CustomServiceOrchestrator(): # If command contains credentialStoreEnabled, then # generate the JCEKS file for the configurations. credentialStoreEnabled = False - if 'credentialStoreEnabled' in command: - credentialStoreEnabled = (command['credentialStoreEnabled'] == "true") + if 'credentialStoreEnabled' in command['serviceLevelParams']: + credentialStoreEnabled = (command['serviceLevelParams']['credentialStoreEnabled'] == "true") if credentialStoreEnabled == True: + # TODO STOMP: fix this with execution commands if 'commandBeingRetried' not in command or command['commandBeingRetried'] != "true": self.generateJceks(command) else: @@ -391,15 +392,15 @@ class CustomServiceOrchestrator(): python_executor = self.get_py_executor(forced_command_name) backup_log_files = not command_name in self.DONT_BACKUP_LOGS_FOR_COMMANDS log_out_files = self.config.get("logging","log_out_files", default="0") != "0" - + for py_file, current_base_dir in filtered_py_file_list: log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS script_params = [command_name, json_path, current_base_dir, tmpstrucoutfile, logger_level, self.exec_tmp_dir, self.force_https_protocol] - + if log_out_files: script_params.append("-o") - + ret = python_executor.run_file(py_file, script_params, tmpoutfile, tmperrfile, timeout, tmpstrucoutfile, self.map_task_to_process, @@ -451,7 +452,44 @@ class CustomServiceOrchestrator(): return "\nCommand aborted." return None - def requestComponentStatus(self, command): + def generate_command(self, command_header): + service_name = command_header['serviceName'] + component_name = command_header['role'] + cluster_id = str(command_header['clusterId']) + + metadata_cache = self.metadata_cache[cluster_id] + configurations_cache = self.configurations_cache[cluster_id] + + component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) + + command_dict = { + 'clusterLevelParams': metadata_cache.clusterLevelParams, + 'serviceLevelParams': metadata_cache.serviceLevelParams[service_name], + 'hostLevelParams': self.topology_cache.get_current_host_info(cluster_id).hostLevelParams, + 'componentLevelParams': component_dict.componentLevelParams, + 'script_type': self.SCRIPT_TYPE_PYTHON + } + command_dict.update(configurations_cache) + #command_dict['componentLevelParams']['script'] = component_dict.statusCommandParams['script'] + #command_dict['serviceLevelParams']['hooks_folder'] = metadata_cache['hooks_folder'] + #command_dict['serviceLevelParams']['service_package_folder'] = component_dict.statusCommandParams['service_package_folder'] + + command_dict['agentLevelParams'] = { + 'public_hostname': self.public_fqdn, + 'agentCacheDir': self.config.get('agent', 'cache_dir'), + } + command_dict['agentLevelParams']["agentConfigParams"] = { + "agent": { + "parallel_execution": self.config.get_parallel_exec_option(), + "use_system_proxy_settings": self.config.use_system_proxy_setting() + } + } + command = copy.copy(command_header) + command.update(command_dict) + + return command + + def requestComponentStatus(self, command_header): """ Component status is determined by exit code, returned by runCommand(). Exit code 0 means that component is running and any other exit code means that @@ -461,9 +499,9 @@ class CustomServiceOrchestrator(): if logger.level == logging.DEBUG: override_output_files = False - res = self.runCommand(command, self.status_commands_stdout, + res = self.runCommand(command_header, self.status_commands_stdout, self.status_commands_stderr, self.COMMAND_NAME_STATUS, - override_output_files=override_output_files) + override_output_files=override_output_files, is_status_command=True) return res def resolve_script_path(self, base_dir, script): @@ -497,17 +535,6 @@ class CustomServiceOrchestrator(): """ Converts command to json file and returns file path """ - # Perform few modifications to stay compatible with the way in which - public_fqdn = self.public_fqdn - command['public_hostname'] = public_fqdn - # Add cache dir to make it visible for commands - command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 'cache_dir') - command["agentConfigParams"] = { - "agent": { - "parallel_execution": self.config.get_parallel_exec_option(), - "use_system_proxy_settings": self.config.use_system_proxy_setting() - } - } # Now, dump the json file command_type = command['commandType'] from ActionQueue import ActionQueue # To avoid cyclic dependency http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/FileCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py index 139dcba..3bff613 100644 --- a/ambari-agent/src/main/python/ambari_agent/FileCache.py +++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py @@ -73,7 +73,7 @@ class FileCache(): """ Returns a base directory for service """ - service_subpath = command['commandParams']['service_package_folder'] + service_subpath = command['serviceLevelParams']['service_package_folder'] return self.provide_directory(self.cache_dir, service_subpath, server_url_prefix) @@ -83,7 +83,7 @@ class FileCache(): Returns a base directory for hooks """ try: - hooks_subpath = command['commandParams']['hooks_folder'] + hooks_subpath = command['serviceLevelParams']['hooks_folder'] except KeyError: return None subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath) http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 4d0ac9b..88c8b91 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -30,6 +30,7 @@ from ambari_agent.Utils import lazy_property from ambari_agent.security import AmbariStompConnection from ambari_agent.ActionQueue import ActionQueue from ambari_agent.CommandStatusDict import CommandStatusDict +from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator logger = logging.getLogger() @@ -66,8 +67,9 @@ class InitializerModule: self.is_registered = False self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir) - self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir) + self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.ambariConfig) self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir) + self.customServiceOrchestrator = CustomServiceOrchestrator(self) self.commandStatuses = CommandStatusDict(self) self.action_queue = ActionQueue(self) http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/Utils.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py index 845eb30..de073bb 100644 --- a/ambari-agent/src/main/python/ambari_agent/Utils.py +++ b/ambari-agent/src/main/python/ambari_agent/Utils.py @@ -74,6 +74,8 @@ class BlockingDictionary(): class Utils(object): @staticmethod def make_immutable(value): + if isinstance(value, ImmutableDictionary): + return value if isinstance(value, dict): return ImmutableDictionary(value) if isinstance(value, (list, tuple)): http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index c969c75..9d57261 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -35,6 +35,8 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from mock.mock import MagicMock, patch +@patch("socket.gethostbyname", new=MagicMock(return_value="192.168.64.101")) +@patch("ambari_agent.hostname.hostname", new=MagicMock(return_value="c6401.ambari.apache.org")) class TestAgentStompResponses(BaseStompServerTestCase): def setUp(self): self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json']) @@ -106,7 +108,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): action_status_failed_frame = json.loads(self.server.frames_queue.get().body) initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) heartbeat_thread.join() @@ -133,6 +135,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module) heartbeat_thread.start() + action_queue = initializer_module.action_queue action_queue.start() @@ -168,7 +171,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) heartbeat_thread.join() @@ -240,7 +243,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) - heartbeat_thread.join() \ No newline at end of file + heartbeat_thread.join() http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json index c415c7d..e8e1ab5 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json @@ -17,6 +17,27 @@ "initLimit":"10", "dataDir":"/hadoop/zookeeper", "tickTime":"2000" + }, + "hadoop-env": { + "proxyuser_group": "users", + "hdfs_user_nproc_limit": "65536", + "hdfs_log_dir_prefix": "/var/log/hadoop", + "keyserver_host": " ", + "namenode_opt_maxnewsize": "200m", + "nfsgateway_heapsize": "1024", + "dtnode_heapsize": "1024m", + "namenode_heapsize": "1024m", + "namenode_opt_maxpermsize": "256m", + "namenode_opt_permsize": "128m", + "hdfs_tmp_dir": "/tmp", + "hdfs_user": "hdfs", + "hdfs_user_nofile_limit": "128000", + "namenode_opt_newsize": "200m", + "keyserver_port": "", + "namenode_backup_dir": "/tmp/upgrades", + "hadoop_root_logger": "INFO,RFA", + "hadoop_heapsize": "1024", + "hadoop_pid_dir_prefix": "/var/run/hadoop" } }, "configurationAttributes":{ http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json index 536233d..6e84319 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json @@ -11,7 +11,7 @@ "commandType":"EXECUTION_COMMAND", "roleCommand":"START", "clusterName": "c1", - "clusterId": 0, + "clusterId": "0", "configuration_credentials":{ }, http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json index 0dc5aff..f60b49a 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json @@ -1,185 +1,43 @@ { - "hash": "c2bea6695221368416b2412fec2ba0d7", - "clusters": { - "0": { - "serviceSpecifics": { - "GANGLIA": { - "version": "3.5.0", - "credentialStoreEnabled": false, - "status_commands_timeout": null - }, - "DRUID": { - "version": "0.9.2", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "TEZ": { - "version": "0.7.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "SPARK": { - "version": "1.6.x", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "HBASE": { - "version": "1.1.2", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "RANGER_KMS": { - "version": "0.7.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "ATLAS": { - "version": "0.8.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "HIVE": { - "version": "1.2.1000", - "credentialStoreEnabled": true, - "status_commands_timeout": null - }, - "SLIDER": { - "version": "0.92.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "AMBARI_INFRA": { - "version": "0.1.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "FLUME": { - "version": "1.5.2", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "MAHOUT": { - "version": "0.9.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "SQOOP": { - "version": "1.4.6", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "OOZIE": { - "version": "4.2.0", - "credentialStoreEnabled": true, - "status_commands_timeout": 300 - }, - "HDFS": { - "version": "2.7.3", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "MAPREDUCE2": { - "version": "2.7.3", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "ACCUMULO": { - "version": "1.7.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "ZOOKEEPER": { - "version": "3.4.6", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "YARN": { - "version": "2.7.3", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "KERBEROS": { - "version": "1.10.3-10", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "KNOX": { - "version": "0.12.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "PIG": { - "version": "0.16.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "STORM": { - "version": "1.1.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "RANGER": { - "version": "0.7.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "AMBARI_METRICS": { - "version": "0.1.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 600 - }, - "ZEPPELIN": { - "version": "0.7.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "KAFKA": { - "version": "0.10.1", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "LOGSEARCH": { - "version": "0.5.0", - "credentialStoreEnabled": true, - "status_commands_timeout": 300 - }, - "FALCON": { - "version": "0.10.0", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - }, - "SPARK2": { - "version": "2.x", - "credentialStoreEnabled": false, - "status_commands_timeout": 300 - } - }, - "clusterLevelParams": { - "host_sys_prepped": "false", - "java_home": null, - "agent_stack_retry_count": "5", - "jdk_location": "http://c6401.ambari.apache.org:8080/resources/", - "jdk_name": null, - "stack_version": "2.6", - "user_list": "[\"accumulo\",\"zookeeper\",\"ams\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]", - "mysql_jdbc_url": "http://dvitiiuk-System-Product-Name:8080/resources//mysql-connector-java.jar", - "oracle_jdbc_url": "http://dvitiiuk-System-Product-Name:8080/resources//ojdbc6.jar", - "ambari_db_rca_password": "mapred", - "jce_name": null, - "group_list": "[\"hadoop\",\"users\"]", - "db_name": "ambari", - "ambari_db_rca_driver": "org.postgresql.Driver", - "ambari_db_rca_username": "mapred", - "java_version": "8", - "not_managed_hdfs_path_list": "[\"/mr-history/done\",\"/app-logs\",\"/tmp\"]", - "db_driver_filename": "mysql-connector-java.jar", - "stack_name": "HDP", - "ambari_db_rca_url": "jdbc:postgresql://c6401.ambari.apache.org/ambarirca", - "agent_stack_retry_on_unavailability": "false", - "user_groups": "{}" - }, - "status_commands_to_run": ["STATUS"], - "hooks_folder": "HDP/2.0.6/hooks" + "hash": "c2bea6695221368416b2412fec2ba0d7", + "clusters": { + "0": { + "clusterLevelParams": { + "jdk_location": "http://gc6401:8080/resources/", + "not_managed_hdfs_path_list": "[\"/mr-history/done\",\"/app-logs\",\"/tmp\"]", + "agent_stack_retry_on_unavailability": "false", + "ambari_db_rca_url": "jdbc:postgresql://gc6401/ambarirca", + "stack_name": "HDP", + "java_version": "8", + "ambari_db_rca_password": "mapred", + "group_list": "[\"hadoop\",\"users\"]", + "host_sys_prepped": "false", + "oracle_jdbc_url": "http://gc6401:8080/resources//ojdbc6.jar", + "jdk_name": "jdk-8u112-linux-x64.tar.gz", + "ambari_db_rca_username": "mapred", + "mysql_jdbc_url": "http://gc6401:8080/resources//mysql-connector-java.jar", + "agent_stack_retry_count": "5", + "db_driver_filename": "mysql-connector-java.jar", + "jce_name": "jce_policy-8.zip", + "user_groups": "{}", + "stack_version": "2.6", + "db_name": "ambari", + "ambari_db_rca_driver": "org.postgresql.Driver", + "java_home": "/usr/jdk64/jdk1.8.0_112", + "user_list": "[\"zookeeper\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]", + "hooks_folder": "HDP/2.0.6/hooks" + }, + "serviceLevelParams": { + "HDFS": { + "credentialStoreEnabled": false, + "status_commands_timeout": 300, + "version": "2.7.3", + "service_package_folder": "common-services/HDFS/2.1.0.2.0/package" } + }, + "status_commands_to_run": [ + "STATUS" + ] } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json index 2c37111..1514516 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json @@ -12,9 +12,10 @@ 0, 1 ], - "statusCommandParams":{ - "script":"scripts/snamenode.py", - "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + "componentLevelParams": { + "unlimited_key_jce_required": "false", + "clientsToUpdateConfigs": "[\"*\"]", + "script":"scripts/snamenode.py" } } ] http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json index a9407c3..2458f08 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json @@ -8,7 +8,10 @@ "hostId":2, "hostName":"c6403.ambari.apache.org", "rackName":"/default-rack", - "ipv4":"192.168.64.103" + "ipv4":"192.168.64.103", + "hostLevelParams": { + "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" + } } ] } http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json index 9894420..53d0e0d 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json @@ -1,57 +1,66 @@ { "0": { - "hosts": [ - { - "rackName": "/default-rack", - "hostName": "c6402.ambari.apache.org", - "ipv4": "192.168.64.102", - "hostId": 1 - }, - { - "rackName": "/default-rack", - "hostName": "c6403.ambari.apache.org", - "ipv4": "192.168.64.103", - "hostId": 2 - } - ], "components": [ { - "statusCommandParams": { - "script": "scripts/datanode.py", - "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package" + "componentLevelParams": { + "clientsToUpdateConfigs": "[\"*\"]", + "script": "scripts/namenode.py", + "unlimited_key_jce_required": "false" }, "componentName": "DATANODE", - "serviceName": "HDFS", - "version": "2.6.0.3-8", "hostIds": [ 1 - ] + ], + "serviceName": "HDFS", + "version": "2.6.0.3-8" }, { - "statusCommandParams": { + "componentLevelParams": { + "clientsToUpdateConfigs": "[\"*\"]", "script": "scripts/hdfs_client.py", - "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package" + "unlimited_key_jce_required": "false" }, "componentName": "HDFS_CLIENT", - "version": "2.6.0.3-8", - "serviceName": "HDFS", "hostIds": [ 0, 1 - ] + ], + "serviceName": "HDFS", + "version": "2.6.0.3-8" }, { - "statusCommandParams": { + "componentLevelParams": { + "clientsToUpdateConfigs": "[\"*\"]", "script": "scripts/snamenode.py", - "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package" + "unlimited_key_jce_required": "false" }, "componentName": "SECONDARY_NAMENODE", - "serviceName": "HDFS", - "version": "2.6.0.3-8", "hostIds": [ 0, 1 - ] + ], + "serviceName": "HDFS", + "version": "2.6.0.3-8" + } + ], + "hosts": [ + { + "hostId": 1, + "hostLevelParams": { + "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" + }, + "hostName": "c6402.ambari.apache.org", + "ipv4": "192.168.64.102", + "rackName": "/default-rack" + }, + { + "hostId": 2, + "hostLevelParams": { + "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" + }, + "hostName": "c6403.ambari.apache.org", + "ipv4": "192.168.64.103", + "rackName": "/default-rack" } ] } http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json index cf1afa7..dfe17b9 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json @@ -11,9 +11,10 @@ "hostIds":[ 0 ], - "statusCommandParams":{ - "script":"scripts/namenode.py", - "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + "componentLevelParams": { + "unlimited_key_jce_required": "false", + "clientsToUpdateConfigs": "[\"*\"]", + "script": "scripts/namenode.py" } }, { @@ -24,9 +25,10 @@ 0, 1 ], - "statusCommandParams":{ - "script":"scripts/datanode.py", - "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + "componentLevelParams": { + "unlimited_key_jce_required": "false", + "clientsToUpdateConfigs": "[\"*\"]", + "script": "scripts/namenode.py" } }, { @@ -36,9 +38,10 @@ "hostIds":[ 0 ], - "statusCommandParams":{ - "script":"scripts/hdfs_client.py", - "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + "componentLevelParams": { + "unlimited_key_jce_required": "false", + "clientsToUpdateConfigs": "[\"*\"]", + "script": "scripts/hdfs_client.py" } } ], @@ -47,13 +50,19 @@ "hostId":0, "hostName":"c6401.ambari.apache.org", "rackName":"/default-rack", - "ipv4":"192.168.64.101" + "ipv4":"192.168.64.101", + "hostLevelParams": { + "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" + } }, { "hostId":1, "hostName":"c6402.ambari.apache.org", "rackName":"/default-rack", - "ipv4":"192.168.64.102" + "ipv4":"192.168.64.102", + "hostLevelParams": { + "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" + } } ] }, @@ -66,9 +75,10 @@ "hostIds":[ 0 ], - "statusCommandParams":{ - "script":"scripts/namenode.py", - "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + "componentLevelParams": { + "unlimited_key_jce_required": "false", + "clientsToUpdateConfigs": "[\"*\"]", + "script": "scripts/namenode.py" } } ], @@ -77,7 +87,10 @@ "hostId":0, "hostName":"c6401.ambari.apache.org", "rackName":"/default-rack", - "ipv4":"192.168.64.101" + "ipv4":"192.168.64.101", + "hostLevelParams": { + "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" + } } ] } http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java index 68b7f3b..60bd197 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java @@ -66,11 +66,7 @@ public class AgentReportsController { componentStatus.setClusterName(clusters.getCluster(report.getClusterId()).getClusterName()); componentStatus.setComponentName(report.getComponentName()); componentStatus.setServiceName(report.getServiceName()); - if (report.getCommand().equals(ComponentStatusReport.CommandStatusCommand.STATUS)) { - componentStatus.setStatus(report.getStatus().toString()); - } else { - componentStatus.setSecurityState(report.getStatus().toString()); - } + componentStatus.setStatus(report.getStatus().toString()); statuses.add(componentStatus); } }