Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf cfec01d69 -> 57b79f937


AMBARI-21688. Fix raceconditions causing ambari-agent to hang/fail (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/57b79f93
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/57b79f93
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/57b79f93

Branch: refs/heads/branch-3.0-perf
Commit: 57b79f937af5d046ef420b8dc839fad92a24c64a
Parents: cfec01d
Author: Andrew Onishuk <aonis...@hortonworks.com>
Authored: Mon Aug 14 14:24:30 2017 +0300
Committer: Andrew Onishuk <aonis...@hortonworks.com>
Committed: Mon Aug 14 14:24:30 2017 +0300

----------------------------------------------------------------------
 .../python/ambari_agent/CommandStatusDict.py    | 10 +++-
 .../ambari_agent/ComponentStatusExecutor.py     |  3 +
 .../main/python/ambari_agent/HeartbeatThread.py | 63 +++++++++++++-------
 .../python/ambari_agent/HostStatusReporter.py   |  3 +
 .../python/ambari_agent/InitializerModule.py    | 36 +++--------
 .../listeners/ServerResponsesListener.py        |  8 ++-
 .../src/main/python/ambari_agent/security.py    | 27 ++++++++-
 .../python/ambari_stomp/adapter/websocket.py    | 18 ++++--
 .../src/main/python/ambari_ws4py/websocket.py   |  3 +
 9 files changed, 114 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py 
b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index d6cbdcc..e7b7e49 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -25,6 +25,7 @@ from collections import defaultdict
 from Grep import Grep
 
 from ambari_agent import Constants
+from ambari_agent import security
 
 logger = logging.getLogger()
 
@@ -59,11 +60,14 @@ class CommandStatusDict():
     self.force_update_to_server({command['clusterId']: [new_report]})
 
   def force_update_to_server(self, reports_dict):
-    if self.initializer_module.is_registered:
+    if not self.initializer_module.is_registered:
+      return False
+
+    try:
       
self.initializer_module.connection.send(message={'clusters':reports_dict}, 
destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
       return True
-
-    return False
+    except security.ConnectionIsNotEstablished:
+      return False
 
   def report(self):
     report = self.generate_report()

http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index be3eb5b..66df15a 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -24,6 +24,7 @@ import threading
 from ambari_agent import Constants
 from ambari_agent.LiveStatus import LiveStatus
 from collections import defaultdict
+from ambari_agent import security
 
 logger = logging.getLogger(__name__)
 
@@ -98,6 +99,8 @@ class ComponentStatusExecutor(threading.Thread):
                 self.recovery_manager.handle_status_change(component_name, 
status)
 
         self.send_updates_to_server(cluster_reports)
+      except security.ConnectionIsNotEstablished: # server and agent 
disconnected during sending data. Not an issue
+        pass
       except:
         logger.exception("Exception in ComponentStatusExecutor. Re-running it")
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index e2fe4af..b6e1aaf 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -23,7 +23,6 @@ import ambari_stomp
 import threading
 from socket import error as socket_error
 
-from ambari_agent.security import ConnectionFailed
 from ambari_agent import Constants
 from ambari_agent.Register import Register
 from ambari_agent.Utils import BlockingDictionary
@@ -35,6 +34,8 @@ from ambari_agent.listeners.MetadataEventListener import 
MetadataEventListener
 from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
 from ambari_agent.listeners.HostLevelParamsEventListener import 
HostLevelParamsEventListener
 from ambari_agent.listeners.AlertDefinitionsEventListener import 
AlertDefinitionsEventListener
+from ambari_agent import security
+from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
 
 HEARTBEAT_INTERVAL = 10
 REQUEST_RESPONSE_TIMEOUT = 10
@@ -90,30 +91,22 @@ class HeartbeatThread(threading.Thread):
         logger.debug("Heartbeat response is {0}".format(response))
         self.handle_heartbeat_reponse(response)
       except Exception as ex:
-        if not isinstance(ex, (socket_error, ConnectionFailed)):
+        if not isinstance(ex, (socket_error, ConnectionIsAlreadyClosed)):
           logger.exception("Exception in HeartbeatThread. Re-running the 
registration")
 
-        self.initializer_module.is_registered = False
-
-        if hasattr(self.initializer_module, '_connection'):
-          try:
-            self.initializer_module.connection.disconnect()
-          except:
-            logger.exception("Exception during 
self.initializer_module.connection.disconnect()")
-
-          delattr(self.initializer_module, '_connection')
+        self.unregister()
 
       self.stop_event.wait(self.heartbeat_interval)
 
-    self.initializer_module.is_registered = False
-    self.initializer_module.connection.disconnect()
-    delattr(self.initializer_module, '_connection')
+    self.unregister()
     logger.info("HeartbeatThread has successfully finished")
 
   def register(self):
     """
     Subscribe to topics, register with server, wait for server's response.
     """
+    self.establish_connection()
+
     self.add_listeners()
     self.subscribe_to_topics(Constants.PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE)
 
@@ -140,6 +133,27 @@ class HeartbeatThread(threading.Thread):
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
     self.file_cache.reset()
     self.initializer_module.is_registered = True
+    # now when registration is done we can expose connection to other threads.
+    self.initializer_module._connection = self.connection
+
+  def unregister(self):
+    """
+    Disconnect and remove connection object from initializer_module so other 
threads cannot use it
+    """
+    self.initializer_module.is_registered = False
+
+    if hasattr(self, 'connection'):
+      try:
+        self.connection.disconnect()
+      except:
+        logger.exception("Exception during self.connection.disconnect()")
+
+      if hasattr(self.initializer_module, '_connection'):
+        delattr(self.initializer_module, '_connection')
+      delattr(self, 'connection')
+
+      # delete any responses, which were not handled (possibly came during 
disconnect, etc.)
+      self.server_responses_listener.reset_responses()
 
   def handle_registration_response(self, response):
     # exitstatus is a code of error which was raised on server side.
@@ -179,28 +193,35 @@ class HeartbeatThread(threading.Thread):
     """
     return {'id':self.responseId}
 
+  def establish_connection(self):
+    """
+    Create a stomp connection
+    """
+    # TODO STOMP: handle if agent.ssl=false?
+    connection_url = 
'wss://{0}:{1}/agent/stomp/v1'.format(self.initializer_module.server_hostname, 
self.initializer_module.secured_url_port)
+    self.connection = security.establish_connection(connection_url)
+
   def add_listeners(self):
     """
     Subscribe to topics and set listener classes.
     """
     for listener in self.listeners:
-      self.initializer_module.connection.add_listener(listener)
+      self.connection.add_listener(listener)
 
   def subscribe_to_topics(self, topics_list):
     for topic_name in topics_list:
-      self.initializer_module.connection.subscribe(destination=topic_name, 
id='sub', ack='client-individual')
+      self.connection.subscribe(destination=topic_name, id='sub', 
ack='client-individual')
 
   def blocking_request(self, message, destination, 
timeout=REQUEST_RESPONSE_TIMEOUT):
     """
     Send a request to server and waits for the response from it. The response 
it detected by the correspondence of correlation_id.
     """
     try:
-      correlation_id = 
self.initializer_module.connection.send(message=message, 
destination=destination)
-    except AttributeError:
+      correlation_id = self.connection.send(message=message, 
destination=destination)
+    except ConnectionIsAlreadyClosed:
       # this happens when trying to connect to broken connection. Happens if 
ambari-server is restarted.
-      err_msg = "Connection failed while trying to connect to 
{0}".format(destination)
-      logger.warn(err_msg)
-      raise ConnectionFailed(err_msg)
+      logger.warn("Connection failed while trying to connect to 
{0}".format(destination))
+      raise
 
     try:
       return 
self.server_responses_listener.responses.blocking_pop(str(correlation_id), 
timeout=timeout)

http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py 
b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
index c45b64a..c60ea36 100644
--- a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
@@ -23,6 +23,7 @@ from ambari_agent import Constants
 from ambari_agent.HostInfo import HostInfo
 from ambari_agent.Utils import Utils
 from ambari_agent.Hardware import Hardware
+from ambari_agent import security
 
 logger = logging.getLogger(__name__)
 
@@ -52,6 +53,8 @@ class HostStatusReporter(threading.Thread):
         # don't use else to avoid race condition
         if not self.initializer_module.is_registered:
           self.last_report = {}
+      except security.ConnectionIsNotEstablished: # server and agent 
disconnected during sending data. Not an issue
+        pass
       except:
         logger.exception("Exception in HostStatusReporter. Re-running it")
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py 
b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 9b031f7..cbf0780 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -21,7 +21,6 @@ limitations under the License.
 import threading
 import logging
 import os
-from socket import error as socket_error
 
 from ambari_agent.FileCache import FileCache
 from ambari_agent.AmbariConfig import AmbariConfig
@@ -30,13 +29,12 @@ from ambari_agent.ClusterTopologyCache import 
ClusterTopologyCache
 from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
 from ambari_agent.ClusterHostLevelParamsCache import 
ClusterHostLevelParamsCache
 from ambari_agent.ClusterAlertDefinitionsCache import 
ClusterAlertDefinitionsCache
-from ambari_agent.Utils import lazy_property
-from ambari_agent.security import AmbariStompConnection
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.CommandStatusDict import CommandStatusDict
 from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.RecoveryManager import RecoveryManager
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
+from ambari_agent import security
 
 logger = logging.getLogger(__name__)
 
@@ -96,29 +94,13 @@ class InitializerModule:
     self.action_queue = ActionQueue(self)
     self.alert_scheduler_handler = AlertSchedulerHandler(self)
 
-  @lazy_property
+  @property
   def connection(self):
-    """
-    Create a stomp connection
-    """
-    # TODO STOMP: handle if agent.ssl=false?
-    connection_url = 
'wss://{0}:{1}/agent/stomp/v1'.format(self.server_hostname, 
self.secured_url_port)
-
-    logging.info("Connecting to {0}".format(connection_url))
-
-    conn = AmbariStompConnection(connection_url)
     try:
-      conn.start()
-      conn.connect(wait=True)
-    except Exception as ex:
-      try:
-        conn.disconnect()
-      except:
-        logger.exception("Exception during conn.disconnect()")
-
-      if isinstance(ex, socket_error):
-        logger.warn("Could not connect to {0}".format(connection_url))
-
-      raise
-
-    return conn
+      return self._connection
+    except AttributeError:
+      """
+      Can be a result of race condition:
+      begin sending X -> got disconnected by HeartbeatThread -> continue 
sending X
+      """
+      raise security.ConnectionIsNotEstablished("Connection to server is not 
established")

http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
 
b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
index 6d23c37..a4a48bc 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
@@ -32,8 +32,8 @@ class ServerResponsesListener(EventListener):
   Listener of Constants.SERVER_RESPONSES_TOPIC events from server.
   """
   def __init__(self):
-    self.responses = Utils.BlockingDictionary()
     self.listener_functions = {}
+    self.reset_responses()
 
   def on_event(self, headers, message):
     """
@@ -66,4 +66,10 @@ class ServerResponsesListener(EventListener):
       return " (correlation_id={0}): {1}".format(correlation_id, message_json)
     return str(message_json)
 
+  def reset_responses(self):
+    """
+    Clear responses dictionary
+    """
+    self.responses = Utils.BlockingDictionary()
+
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py 
b/ambari-agent/src/main/python/ambari_agent/security.py
index 9509619..a505658 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -32,6 +32,7 @@ import platform
 import ambari_stomp
 import threading
 from ambari_stomp.adapter.websocket import WsConnection
+from socket import error as socket_error
 
 logger = logging.getLogger(__name__)
 
@@ -102,9 +103,33 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
 
     return sock
 
-class ConnectionFailed(Exception):
+
+class ConnectionIsNotEstablished(Exception):
   pass
 
+def establish_connection(connection_url):
+  """
+  Create a stomp connection
+  """
+  logging.info("Connecting to {0}".format(connection_url))
+
+  conn = AmbariStompConnection(connection_url)
+  try:
+    conn.start()
+    conn.connect(wait=True)
+  except Exception as ex:
+    try:
+      conn.disconnect()
+    except:
+      logger.exception("Exception during conn.disconnect()")
+
+    if isinstance(ex, socket_error):
+      logger.warn("Could not connect to {0}".format(connection_url))
+
+    raise
+
+  return conn
+
 class AmbariStompConnection(WsConnection):
   def __init__(self, url):
     self.lock = threading.RLock()

http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py 
b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
index 220f399..6cf19db 100644
--- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
@@ -90,12 +90,16 @@ class WsTransport(Transport):
 
   def send(self, encoded_frame):
     logger.debug("Outgoing STOMP message:\n>>> " + encoded_frame)
+    if self.ws.terminated:
+      raise ConnectionIsAlreadyClosed("Connection is already closed cannot 
send data")
+
     self.ws.send(encoded_frame)
 
   def receive(self):
     try:
-      msg = str(self.ws.receive())
-      logger.debug("Incoming STOMP message:\n<<< " + msg)
+      msg = self.ws.receive()
+      msg = str(msg) if msg is not None else msg
+      logger.debug("Incoming STOMP message:\n<<< {0}".format(msg))
       return msg
     except:
       # exceptions from this method are hidden by the framework so 
implementing logging by ourselves
@@ -105,9 +109,9 @@ class WsTransport(Transport):
   def stop(self):
     self.running = False
     try:
-      self.ws.close_connection()
+      self.ws.terminate()
     except:
-      logger.exception("Exception during self.ws.close_connection()")
+      logger.exception("Exception during self.ws.terminate()")
 
     try:
       self.disconnect_socket()
@@ -140,3 +144,9 @@ class ConnectionResponseTimeout(StompException):
   """
   Raised when sent 'STOMP' frame and have not received 'CONNECTED' a certain 
timeout.
   """
+
+class ConnectionIsAlreadyClosed(StompException):
+  """
+  Raised when trying to send data on connection which is already closed. 
Usually after it was brought down by server.
+  """
+  pass

http://git-wip-us.apache.org/repos/asf/ambari/blob/57b79f93/ambari-common/src/main/python/ambari_ws4py/websocket.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_ws4py/websocket.py 
b/ambari-common/src/main/python/ambari_ws4py/websocket.py
index b5c1fd3..b353086 100644
--- a/ambari-common/src/main/python/ambari_ws4py/websocket.py
+++ b/ambari-common/src/main/python/ambari_ws4py/websocket.py
@@ -422,6 +422,9 @@ class WebSocket(object):
         """
         s = self.stream
 
+        if s is None:
+          return
+
         try:
             if s.closing is None:
                 self.closed(1006, "Going away")

Reply via email to