This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new a41262e AMBARI-24583. Ambari agent status could be reported stale just after execution command thread has finished execution (aonishuk) a41262e is described below commit a41262e2fa1728f665bc16a4aaf00e389b000f11 Author: Andrew Onishuk <aonis...@hortonworks.com> AuthorDate: Sun Sep 2 12:37:48 2018 +0300 AMBARI-24583. Ambari agent status could be reported stale just after execution command thread has finished execution (aonishuk) --- .../python/ambari_agent/ComponentStatusExecutor.py | 35 ++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index 64e6ae7..df72c88 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -21,10 +21,12 @@ import threading from ambari_agent import Constants from ambari_agent.LiveStatus import LiveStatus +from ambari_agent.Utils import Utils from collections import defaultdict from ambari_agent.models.commands import AgentCommand from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed +from resource_management.libraries.functions.default import default class ComponentStatusExecutor(threading.Thread): @@ -39,6 +41,8 @@ class ComponentStatusExecutor(threading.Thread): self.reported_component_status = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda:None))) # component statuses which were received by server self.server_responses_listener = initializer_module.server_responses_listener self.logger = logging.getLogger(__name__) + self.reports_to_discard = [] + self.reports_to_discard_lock = threading.RLock() threading.Thread.__init__(self) def run(self): @@ -54,6 +58,9 @@ class ComponentStatusExecutor(threading.Thread): self.clean_not_existing_clusters_info() cluster_reports = defaultdict(lambda:[]) + with self.reports_to_discard_lock: + self.reports_to_discard = [] + for cluster_id in self.topology_cache.get_cluster_ids(): # TODO: check if we can make clusters immutable too try: @@ -104,6 +111,8 @@ class ComponentStatusExecutor(threading.Thread): if result: cluster_reports[cluster_id].append(result) + + cluster_reports = self.discard_stale_reports(cluster_reports) self.send_updates_to_server(cluster_reports) except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue pass @@ -113,6 +122,29 @@ class ComponentStatusExecutor(threading.Thread): self.stop_event.wait(self.status_commands_run_interval) self.logger.info("ComponentStatusExecutor has successfully finished") + def discard_stale_reports(self, cluster_reports): + """ + Remove reports which are already stale (meaning other process has already updated status to something different) + """ + with self.reports_to_discard_lock: + # nothing to discard + if not self.reports_to_discard: + return cluster_reports + + reports_to_discard = self.reports_to_discard[:] + + new_cluster_reports = defaultdict(lambda:[]) + for cluster_id, cluster_reports in cluster_reports.iteritems(): + for cluster_report in cluster_reports: + for discarded_report in reports_to_discard: + if Utils.are_dicts_equal(cluster_report, discarded_report, keys_to_skip=['status']): + self.logger.info("Discarding outdated status {0} before sending".format(cluster_report)) + break + else: + new_cluster_reports[cluster_id].append(cluster_report) + + return new_cluster_reports + def check_component_status(self, cluster_id, service_name, component_name, command_name, report=False): """ Returns components status if it has changed, otherwise None. @@ -151,6 +183,9 @@ class ComponentStatusExecutor(threading.Thread): self.recovery_manager.handle_status_change(component_name, status) if report: + with self.reports_to_discard_lock: + self.reports_to_discard.append(result) + self.send_updates_to_server({cluster_id: [result]}) return result