This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch branch-3.0-perf in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 51b7443756a58d8306ecfbd8572d82e8ed186df4 Author: Andrew Onishuk <aonis...@hortonworks.com> AuthorDate: Mon Jan 15 15:30:47 2018 +0200 AMBARI-22784. Fix stack unit tests on branch-3.0-perf (aonishuk) --- .../src/main/python/ambari_stomp/adapter/websocket.py | 9 ++++++++- .../src/main/python/ambari_stomp/transport.py | 19 +++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py index 6cf19db..421fd88 100644 --- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py +++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py @@ -54,13 +54,16 @@ class QueuedWebSocketClient(WebSocketClient): # left in the queue, return None immediately otherwise the client # will block forever if self.terminated and self.messages.empty(): + logger.info("!!! RETURNING NONE") return None message = self.messages.get() if message is StopIteration: + logger.info("!!! RETURNING NONE DUE TO STOP_ITERATION") return None return message def closed(self, code, reason=None): + logger.info("!!!CLOSED IS CALLED {0} {1}", code, reason) self.messages.put(StopIteration) class WsTransport(Transport): @@ -91,6 +94,7 @@ class WsTransport(Transport): def send(self, encoded_frame): logger.debug("Outgoing STOMP message:\n>>> " + encoded_frame) if self.ws.terminated: + logger.info("!!!SEND ERROR") raise ConnectionIsAlreadyClosed("Connection is already closed cannot send data") self.ws.send(encoded_frame) @@ -99,7 +103,8 @@ class WsTransport(Transport): try: msg = self.ws.receive() msg = str(msg) if msg is not None else msg - logger.debug("Incoming STOMP message:\n<<< {0}".format(msg)) + if not msg: + logger.info("Incoming STOMP message:\n<<< {0}".format(msg)) return msg except: # exceptions from this method are hidden by the framework so implementing logging by ourselves @@ -107,6 +112,7 @@ class WsTransport(Transport): return None def stop(self): + logger.info("!!!WsTransport.stop()") self.running = False try: self.ws.terminate() @@ -131,6 +137,7 @@ class WsConnection(BaseConnection, Protocol12): Protocol12.__init__(self, self.transport, (0, 0)) def disconnect(self, receipt=None, headers=None, **keyword_headers): + logger.info("!!!WsConnection.disconnect()") try: Protocol12.disconnect(self, receipt, headers, **keyword_headers) except: diff --git a/ambari-common/src/main/python/ambari_stomp/transport.py b/ambari-common/src/main/python/ambari_stomp/transport.py index 32604fc..8d85dcc 100644 --- a/ambari-common/src/main/python/ambari_stomp/transport.py +++ b/ambari-common/src/main/python/ambari_stomp/transport.py @@ -326,6 +326,7 @@ class BaseTransport(ambari_stomp.listener.Publisher): """ Main loop listening for incoming data. """ + log.info("Starting receiver loop") try: while self.running: @@ -340,8 +341,12 @@ class BaseTransport(ambari_stomp.listener.Publisher): if self.__auto_decode: f.body = decode(f.body) self.process_frame(f, frame) + + log.info("!!NOT RUNNING") except exception.ConnectionClosedException: + log.exception("!!except exception.ConnectionClosedException:") if self.running: + log.info("!!except exception.ConnectionClosedException if self.running") self.notify('disconnected') # # Clear out any half-received messages after losing connection @@ -349,8 +354,16 @@ class BaseTransport(ambari_stomp.listener.Publisher): self.__recvbuf = b'' self.running = False break + except: + log.exception("!!!EXCEPTION at loop") + raise finally: + log.info("!!!CLEANUP") self.cleanup() + log.info("!!NOT RUNNING BIG LOOP") + except: + log.exception("!!!EXCEPTION at big loop") + raise finally: with self.__receiver_thread_exit_condition: self.__receiver_thread_exited = True @@ -370,12 +383,13 @@ class BaseTransport(ambari_stomp.listener.Publisher): try: c = self.receive() except exception.InterruptedException: - log.debug("socket read interrupted, restarting") + log.info("!!!socket read interrupted, restarting") continue except Exception: - log.debug("socket read error", exc_info=True) + log.info("!!!socket read error", exc_info=True) c = b'' if c is None or len(c) == 0: + log.error("!!ConnectionClosedException!!! {0}".format(c)) raise exception.ConnectionClosedException() if c == b'\x0a' and not self.__recvbuf and not fastbuf.tell(): # @@ -576,6 +590,7 @@ class Transport(BaseTransport): """ Disconnect the underlying socket connection """ + log.info("!!!disconnect_socket") self.running = False if self.socket is not None: if self.__need_ssl(): -- To stop receiving notification emails like this one, please contact aonis...@apache.org.