This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 52a4937  bugfix: agent cannot reconnect after server is down (#79)
52a4937 is described below

commit 52a4937ee77ab494ee74a933b6bf9defad9802fe
Author: kezhenxu94 <kezhenx...@apache.org>
AuthorDate: Sun Nov 8 21:52:23 2020 +0800

    bugfix: agent cannot reconnect after server is down (#79)
---
 skywalking/agent/__init__.py      |  7 +++----
 skywalking/agent/protocol/grpc.py | 33 ++++++++++++++++++++++++---------
 2 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index c809679..a58a26f 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -40,10 +40,9 @@ def __heartbeat():
 def __report():
     while not __finished.is_set():
         if connected():
-            __protocol.report(__queue)
-            break
-        else:
-            __finished.wait(1)
+            __protocol.report(__queue)  # is blocking actually
+
+        __finished.wait(1)
 
 
 __heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, 
daemon=True)
diff --git a/skywalking/agent/protocol/grpc.py 
b/skywalking/agent/protocol/grpc.py
index c026f99..b8a9cf4 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -16,6 +16,7 @@
 #
 
 import logging
+import traceback
 from queue import Queue
 
 import grpc
@@ -40,22 +41,33 @@ class GrpcProtocol(Protocol):
                 self.channel, header_adder_interceptor('authentication', 
config.authentication)
             )
 
-        def cb(state):
-            logger.debug('grpc channel connectivity changed, [%s -> %s]', 
self.state, state)
-            self.state = state
-            if self.connected():
-                self.service_management.send_instance_props()
-
-        self.channel.subscribe(cb, try_to_connect=True)
+        self.channel.subscribe(self._cb, try_to_connect=True)
         self.service_management = GrpcServiceManagementClient(self.channel)
         self.traces_reporter = GrpcTraceSegmentReportService(self.channel)
 
+    def _cb(self, state):
+        logger.debug('grpc channel connectivity changed, [%s -> %s]', 
self.state, state)
+        self.state = state
+        if self.connected():
+            try:
+                self.service_management.send_instance_props()
+            except grpc.RpcError:
+                self.on_error()
+
     def heartbeat(self):
-        self.service_management.send_heart_beat()
+        try:
+            self.service_management.send_heart_beat()
+        except grpc.RpcError:
+            self.on_error()
 
     def connected(self):
         return self.state == grpc.ChannelConnectivity.READY
 
+    def on_error(self):
+        traceback.print_exc()
+        self.channel.unsubscribe(self._cb)
+        self.channel.subscribe(self._cb, try_to_connect=True)
+
     def report(self, queue: Queue):
         def generator():
             while True:
@@ -104,4 +116,7 @@ class GrpcProtocol(Protocol):
 
                 queue.task_done()
 
-        self.traces_reporter.report(generator())
+        try:
+            self.traces_reporter.report(generator())
+        except grpc.RpcError:
+            self.on_error()

Reply via email to