Repository: ambari Updated Branches: refs/heads/branch-3.0-perf cfec01d69 -> 57b79f937
AMBARI-21688. Fix raceconditions causing ambari-agent to hang/fail (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/57b79f93 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/57b79f93 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/57b79f93 Branch: refs/heads/branch-3.0-perf Commit: 57b79f937af5d046ef420b8dc839fad92a24c64a Parents: cfec01d Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Mon Aug 14 14:24:30 2017 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Mon Aug 14 14:24:30 2017 +0300 ---------------------------------------------------------------------- .../python/ambari_agent/CommandStatusDict.py | 10 +++- .../ambari_agent/ComponentStatusExecutor.py | 3 + .../main/python/ambari_agent/HeartbeatThread.py | 63 +++++++++++++------- .../python/ambari_agent/HostStatusReporter.py | 3 + .../python/ambari_agent/InitializerModule.py | 36 +++-------- .../listeners/ServerResponsesListener.py | 8 ++- .../src/main/python/ambari_agent/security.py | 27 ++++++++- .../python/ambari_stomp/adapter/websocket.py | 18 ++++-- .../src/main/python/ambari_ws4py/websocket.py | 3 + 9 files changed, 114 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index d6cbdcc..e7b7e49 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -25,6 +25,7 @@ from collections import defaultdict from Grep import Grep from ambari_agent import Constants +from ambari_agent import security logger = logging.getLogger() @@ -59,11 +60,14 @@ class CommandStatusDict(): self.force_update_to_server({command['clusterId']: [new_report]}) def force_update_to_server(self, reports_dict): - if self.initializer_module.is_registered: + if not self.initializer_module.is_registered: + return False + + try: self.initializer_module.connection.send(message={'clusters':reports_dict}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) return True - - return False + except security.ConnectionIsNotEstablished: + return False def report(self): report = self.generate_report() http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index be3eb5b..66df15a 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -24,6 +24,7 @@ import threading from ambari_agent import Constants from ambari_agent.LiveStatus import LiveStatus from collections import defaultdict +from ambari_agent import security logger = logging.getLogger(__name__) @@ -98,6 +99,8 @@ class ComponentStatusExecutor(threading.Thread): self.recovery_manager.handle_status_change(component_name, status) self.send_updates_to_server(cluster_reports) + except security.ConnectionIsNotEstablished: # server and agent disconnected during sending data. Not an issue + pass except: logger.exception("Exception in ComponentStatusExecutor. Re-running it") http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index e2fe4af..b6e1aaf 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -23,7 +23,6 @@ import ambari_stomp import threading from socket import error as socket_error -from ambari_agent.security import ConnectionFailed from ambari_agent import Constants from ambari_agent.Register import Register from ambari_agent.Utils import BlockingDictionary @@ -35,6 +34,8 @@ from ambari_agent.listeners.MetadataEventListener import MetadataEventListener from ambari_agent.listeners.CommandsEventListener import CommandsEventListener from ambari_agent.listeners.HostLevelParamsEventListener import HostLevelParamsEventListener from ambari_agent.listeners.AlertDefinitionsEventListener import AlertDefinitionsEventListener +from ambari_agent import security +from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed HEARTBEAT_INTERVAL = 10 REQUEST_RESPONSE_TIMEOUT = 10 @@ -90,30 +91,22 @@ class HeartbeatThread(threading.Thread): logger.debug("Heartbeat response is {0}".format(response)) self.handle_heartbeat_reponse(response) except Exception as ex: - if not isinstance(ex, (socket_error, ConnectionFailed)): + if not isinstance(ex, (socket_error, ConnectionIsAlreadyClosed)): logger.exception("Exception in HeartbeatThread. Re-running the registration") - self.initializer_module.is_registered = False - - if hasattr(self.initializer_module, '_connection'): - try: - self.initializer_module.connection.disconnect() - except: - logger.exception("Exception during self.initializer_module.connection.disconnect()") - - delattr(self.initializer_module, '_connection') + self.unregister() self.stop_event.wait(self.heartbeat_interval) - self.initializer_module.is_registered = False - self.initializer_module.connection.disconnect() - delattr(self.initializer_module, '_connection') + self.unregister() logger.info("HeartbeatThread has successfully finished") def register(self): """ Subscribe to topics, register with server, wait for server's response. """ + self.establish_connection() + self.add_listeners() self.subscribe_to_topics(Constants.PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE) @@ -140,6 +133,27 @@ class HeartbeatThread(threading.Thread): self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE) self.file_cache.reset() self.initializer_module.is_registered = True + # now when registration is done we can expose connection to other threads. + self.initializer_module._connection = self.connection + + def unregister(self): + """ + Disconnect and remove connection object from initializer_module so other threads cannot use it + """ + self.initializer_module.is_registered = False + + if hasattr(self, 'connection'): + try: + self.connection.disconnect() + except: + logger.exception("Exception during self.connection.disconnect()") + + if hasattr(self.initializer_module, '_connection'): + delattr(self.initializer_module, '_connection') + delattr(self, 'connection') + + # delete any responses, which were not handled (possibly came during disconnect, etc.) + self.server_responses_listener.reset_responses() def handle_registration_response(self, response): # exitstatus is a code of error which was raised on server side. @@ -179,28 +193,35 @@ class HeartbeatThread(threading.Thread): """ return {'id':self.responseId} + def establish_connection(self): + """ + Create a stomp connection + """ + # TODO STOMP: handle if agent.ssl=false? + connection_url = 'wss://{0}:{1}/agent/stomp/v1'.format(self.initializer_module.server_hostname, self.initializer_module.secured_url_port) + self.connection = security.establish_connection(connection_url) + def add_listeners(self): """ Subscribe to topics and set listener classes. """ for listener in self.listeners: - self.initializer_module.connection.add_listener(listener) + self.connection.add_listener(listener) def subscribe_to_topics(self, topics_list): for topic_name in topics_list: - self.initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual') + self.connection.subscribe(destination=topic_name, id='sub', ack='client-individual') def blocking_request(self, message, destination, timeout=REQUEST_RESPONSE_TIMEOUT): """ Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id. """ try: - correlation_id = self.initializer_module.connection.send(message=message, destination=destination) - except AttributeError: + correlation_id = self.connection.send(message=message, destination=destination) + except ConnectionIsAlreadyClosed: # this happens when trying to connect to broken connection. Happens if ambari-server is restarted. - err_msg = "Connection failed while trying to connect to {0}".format(destination) - logger.warn(err_msg) - raise ConnectionFailed(err_msg) + logger.warn("Connection failed while trying to connect to {0}".format(destination)) + raise try: return self.server_responses_listener.responses.blocking_pop(str(correlation_id), timeout=timeout) http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py index c45b64a..c60ea36 100644 --- a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py +++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py @@ -23,6 +23,7 @@ from ambari_agent import Constants from ambari_agent.HostInfo import HostInfo from ambari_agent.Utils import Utils from ambari_agent.Hardware import Hardware +from ambari_agent import security logger = logging.getLogger(__name__) @@ -52,6 +53,8 @@ class HostStatusReporter(threading.Thread): # don't use else to avoid race condition if not self.initializer_module.is_registered: self.last_report = {} + except security.ConnectionIsNotEstablished: # server and agent disconnected during sending data. Not an issue + pass except: logger.exception("Exception in HostStatusReporter. Re-running it") http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 9b031f7..cbf0780 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -21,7 +21,6 @@ limitations under the License. import threading import logging import os -from socket import error as socket_error from ambari_agent.FileCache import FileCache from ambari_agent.AmbariConfig import AmbariConfig @@ -30,13 +29,12 @@ from ambari_agent.ClusterTopologyCache import ClusterTopologyCache from ambari_agent.ClusterMetadataCache import ClusterMetadataCache from ambari_agent.ClusterHostLevelParamsCache import ClusterHostLevelParamsCache from ambari_agent.ClusterAlertDefinitionsCache import ClusterAlertDefinitionsCache -from ambari_agent.Utils import lazy_property -from ambari_agent.security import AmbariStompConnection from ambari_agent.ActionQueue import ActionQueue from ambari_agent.CommandStatusDict import CommandStatusDict from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.RecoveryManager import RecoveryManager from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler +from ambari_agent import security logger = logging.getLogger(__name__) @@ -96,29 +94,13 @@ class InitializerModule: self.action_queue = ActionQueue(self) self.alert_scheduler_handler = AlertSchedulerHandler(self) - @lazy_property + @property def connection(self): - """ - Create a stomp connection - """ - # TODO STOMP: handle if agent.ssl=false? - connection_url = 'wss://{0}:{1}/agent/stomp/v1'.format(self.server_hostname, self.secured_url_port) - - logging.info("Connecting to {0}".format(connection_url)) - - conn = AmbariStompConnection(connection_url) try: - conn.start() - conn.connect(wait=True) - except Exception as ex: - try: - conn.disconnect() - except: - logger.exception("Exception during conn.disconnect()") - - if isinstance(ex, socket_error): - logger.warn("Could not connect to {0}".format(connection_url)) - - raise - - return conn + return self._connection + except AttributeError: + """ + Can be a result of race condition: + begin sending X -> got disconnected by HeartbeatThread -> continue sending X + """ + raise security.ConnectionIsNotEstablished("Connection to server is not established") http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py index 6d23c37..a4a48bc 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py @@ -32,8 +32,8 @@ class ServerResponsesListener(EventListener): Listener of Constants.SERVER_RESPONSES_TOPIC events from server. """ def __init__(self): - self.responses = Utils.BlockingDictionary() self.listener_functions = {} + self.reset_responses() def on_event(self, headers, message): """ @@ -66,4 +66,10 @@ class ServerResponsesListener(EventListener): return " (correlation_id={0}): {1}".format(correlation_id, message_json) return str(message_json) + def reset_responses(self): + """ + Clear responses dictionary + """ + self.responses = Utils.BlockingDictionary() + http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/security.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py index 9509619..a505658 100644 --- a/ambari-agent/src/main/python/ambari_agent/security.py +++ b/ambari-agent/src/main/python/ambari_agent/security.py @@ -32,6 +32,7 @@ import platform import ambari_stomp import threading from ambari_stomp.adapter.websocket import WsConnection +from socket import error as socket_error logger = logging.getLogger(__name__) @@ -102,9 +103,33 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): return sock -class ConnectionFailed(Exception): + +class ConnectionIsNotEstablished(Exception): pass +def establish_connection(connection_url): + """ + Create a stomp connection + """ + logging.info("Connecting to {0}".format(connection_url)) + + conn = AmbariStompConnection(connection_url) + try: + conn.start() + conn.connect(wait=True) + except Exception as ex: + try: + conn.disconnect() + except: + logger.exception("Exception during conn.disconnect()") + + if isinstance(ex, socket_error): + logger.warn("Could not connect to {0}".format(connection_url)) + + raise + + return conn + class AmbariStompConnection(WsConnection): def __init__(self, url): self.lock = threading.RLock() http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py index 220f399..6cf19db 100644 --- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py +++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py @@ -90,12 +90,16 @@ class WsTransport(Transport): def send(self, encoded_frame): logger.debug("Outgoing STOMP message:\n>>> " + encoded_frame) + if self.ws.terminated: + raise ConnectionIsAlreadyClosed("Connection is already closed cannot send data") + self.ws.send(encoded_frame) def receive(self): try: - msg = str(self.ws.receive()) - logger.debug("Incoming STOMP message:\n<<< " + msg) + msg = self.ws.receive() + msg = str(msg) if msg is not None else msg + logger.debug("Incoming STOMP message:\n<<< {0}".format(msg)) return msg except: # exceptions from this method are hidden by the framework so implementing logging by ourselves @@ -105,9 +109,9 @@ class WsTransport(Transport): def stop(self): self.running = False try: - self.ws.close_connection() + self.ws.terminate() except: - logger.exception("Exception during self.ws.close_connection()") + logger.exception("Exception during self.ws.terminate()") try: self.disconnect_socket() @@ -140,3 +144,9 @@ class ConnectionResponseTimeout(StompException): """ Raised when sent 'STOMP' frame and have not received 'CONNECTED' a certain timeout. """ + +class ConnectionIsAlreadyClosed(StompException): + """ + Raised when trying to send data on connection which is already closed. Usually after it was brought down by server. + """ + pass http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-common/src/main/python/ambari_ws4py/websocket.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_ws4py/websocket.py b/ambari-common/src/main/python/ambari_ws4py/websocket.py index b5c1fd3..b353086 100644 --- a/ambari-common/src/main/python/ambari_ws4py/websocket.py +++ b/ambari-common/src/main/python/ambari_ws4py/websocket.py @@ -422,6 +422,9 @@ class WebSocket(object): """ s = self.stream + if s is None: + return + try: if s.closing is None: self.closed(1006, "Going away")