This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push: new 52a4937 bugfix: agent cannot reconnect after server is down (#79) 52a4937 is described below commit 52a4937ee77ab494ee74a933b6bf9defad9802fe Author: kezhenxu94 <kezhenx...@apache.org> AuthorDate: Sun Nov 8 21:52:23 2020 +0800 bugfix: agent cannot reconnect after server is down (#79) --- skywalking/agent/__init__.py | 7 +++---- skywalking/agent/protocol/grpc.py | 33 ++++++++++++++++++++++++--------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index c809679..a58a26f 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -40,10 +40,9 @@ def __heartbeat(): def __report(): while not __finished.is_set(): if connected(): - __protocol.report(__queue) - break - else: - __finished.wait(1) + __protocol.report(__queue) # is blocking actually + + __finished.wait(1) __heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True) diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index c026f99..b8a9cf4 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -16,6 +16,7 @@ # import logging +import traceback from queue import Queue import grpc @@ -40,22 +41,33 @@ class GrpcProtocol(Protocol): self.channel, header_adder_interceptor('authentication', config.authentication) ) - def cb(state): - logger.debug('grpc channel connectivity changed, [%s -> %s]', self.state, state) - self.state = state - if self.connected(): - self.service_management.send_instance_props() - - self.channel.subscribe(cb, try_to_connect=True) + self.channel.subscribe(self._cb, try_to_connect=True) self.service_management = GrpcServiceManagementClient(self.channel) self.traces_reporter = GrpcTraceSegmentReportService(self.channel) + def _cb(self, state): + logger.debug('grpc channel connectivity changed, [%s -> %s]', self.state, state) + self.state = state + if self.connected(): + try: + self.service_management.send_instance_props() + except grpc.RpcError: + self.on_error() + def heartbeat(self): - self.service_management.send_heart_beat() + try: + self.service_management.send_heart_beat() + except grpc.RpcError: + self.on_error() def connected(self): return self.state == grpc.ChannelConnectivity.READY + def on_error(self): + traceback.print_exc() + self.channel.unsubscribe(self._cb) + self.channel.subscribe(self._cb, try_to_connect=True) + def report(self, queue: Queue): def generator(): while True: @@ -104,4 +116,7 @@ class GrpcProtocol(Protocol): queue.task_done() - self.traces_reporter.report(generator()) + try: + self.traces_reporter.report(generator()) + except grpc.RpcError: + self.on_error()