sonatype-lift[bot] commented on code in PR #287:
URL: https://github.com/apache/skywalking-python/pull/287#discussion_r1108048061


##########
skywalking/agent/__init__.py:
##########
@@ -16,269 +16,353 @@
 #
 
 import atexit
+import functools
+import os
+import sys
 from queue import Queue, Full
 from threading import Thread, Event
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
 
 from skywalking import config, plugins
 from skywalking import loggings
-from skywalking import profile
 from skywalking import meter
+from skywalking import profile
 from skywalking.agent.protocol import Protocol
 from skywalking.command import command_service
 from skywalking.loggings import logger
 from skywalking.profile.profile_task import ProfileTask
 from skywalking.profile.snapshot import TracingThreadSnapshot
-from skywalking.protocol.logging.Logging_pb2 import LogData
 from skywalking.protocol.language_agent.Meter_pb2 import MeterData
+from skywalking.protocol.logging.Logging_pb2 import LogData
+from skywalking.utils.singleton import Singleton
 
 if TYPE_CHECKING:
     from skywalking.trace.context import Segment
 
-__started = False
-__protocol = None  # type: Protocol
-__heartbeat_thread = __report_thread = __log_report_thread = 
__query_profile_thread = __command_dispatch_thread \
-    = __send_profile_thread = __queue = __log_queue = __snapshot_queue = 
__meter_queue = __finished = None
-
-
-def __heartbeat():
-    wait = base = 30
-
-    while not __finished.is_set():
-        try:
-            __protocol.heartbeat()
-            wait = base  # reset to base wait time on success
-        except Exception as exc:
-            logger.error(str(exc))
-            wait = min(60, wait * 2 or 1)  # double wait time with each 
consecutive error up to a maximum
-
-        __finished.wait(wait)
-
 
-def __report():
-    wait = base = 0
-
-    while not __finished.is_set():
+def report_with_backoff(reporter_name, init_wait):
+    """
+    An exponential backoff for retrying reporters.
+    """
+
+    def backoff_decorator(func):
+        @functools.wraps(func)
+        def backoff_wrapper(self, *args, **kwargs):
+            wait = base = init_wait
+            while not self._finished.is_set():
+                try:
+                    func(self, *args, **kwargs)
+                    wait = base  # reset to base wait time on success
+                except Exception:  # noqa
+                    wait = min(60, wait * 2 or 1)  # double wait time with 
each consecutive error up to a maximum
+                    logger.exception(f'Exception in {reporter_name} service in 
pid {os.getpid()}, '
+                                     f'retry in {wait} seconds')
+                self._finished.wait(wait)
+            logger.info('finished reporter thread')
+        return backoff_wrapper
+    return backoff_decorator
+
+
+class SkyWalkingAgent(Singleton):
+    """
+    The main singleton class and entrypoint of SkyWalking Python Agent.
+    Upon fork(), original instance rebuild everything (queues, threads, 
instrumentation) by
+    calling the fork handlers in the class instance.
+    """
+    __started: bool = False  # shared by all instances
+
+    def __init__(self):
+        """
+        Protocol is one of gRPC, HTTP and Kafka that
+        provides clients to reporters to communicate with OAP backend.
+        """
+        self.started_pid = None
+        self.__protocol: Optional[Protocol] = None
+        self._finished: Optional[Event] = None
+
+    def __bootstrap(self):
+        # when forking, already instrumented modules must not be instrumented 
again
+        # otherwise it will cause double instrumentation! (we should provide 
an un-instrument method)
+        if config.agent_protocol == 'grpc':
+            from skywalking.agent.protocol.grpc import GrpcProtocol
+            self.__protocol = GrpcProtocol()
+        elif config.agent_protocol == 'http':
+            from skywalking.agent.protocol.http import HttpProtocol
+            self.__protocol = HttpProtocol()
+        elif config.agent_protocol == 'kafka':
+            from skywalking.agent.protocol.kafka import KafkaProtocol
+            self.__protocol = KafkaProtocol()
+
+        # Initialize queues for segment, log, meter and profiling snapshots
+        self.__segment_queue: Optional[Queue] = None
+        self.__log_queue: Optional[Queue] = None
+        self.__meter_queue: Optional[Queue] = None
+        self.__snapshot_queue: Optional[Queue] = None
+
+        # Start reporter threads and register queues
+        self.__init_threading()
+
+    def __init_threading(self) -> None:
+        """
+        This method initializes all the queues and threads for the agent and 
reporters.
+        Upon os.fork(), callback will reinitialize threads and queues by 
calling this method
+
+        Heartbeat thread is started by default.
+        Segment reporter thread and segment queue is created by default.
+        All other queues and threads depends on user configuration.
+        """
+        self._finished = Event()
+
+        __heartbeat_thread = Thread(name='HeartbeatThread', 
target=self.__heartbeat, daemon=True)
+        __heartbeat_thread.start()
+
+        self.__segment_queue = 
Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
+        __segment_report_thread = Thread(name='SegmentReportThread', 
target=self.__report_segment, daemon=True)
+        __segment_report_thread.start()
+
+        if config.agent_meter_reporter_active:
+            self.__meter_queue = 
Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
+            __meter_report_thread = Thread(name='MeterReportThread', 
target=self.__report_meter, daemon=True)
+            __meter_report_thread.start()
+
+            if config.agent_pvm_meter_reporter_active:
+                from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource
+                from skywalking.meter.pvm.gc_data import GCDataSource
+                from skywalking.meter.pvm.mem_usage import MEMUsageDataSource
+                from skywalking.meter.pvm.thread_data import ThreadDataSource
+
+                MEMUsageDataSource().register()
+                CPUUsageDataSource().register()
+                GCDataSource().register()
+                ThreadDataSource().register()
+
+        if config.agent_log_reporter_active:
+            self.__log_queue = 
Queue(maxsize=config.agent_log_reporter_max_buffer_size)
+            __log_report_thread = Thread(name='LogReportThread', 
target=self.__report_log, daemon=True)
+            __log_report_thread.start()
+
+        if config.agent_profile_active:
+            # Now only profiler receives commands from OAP
+            __command_dispatch_thread = Thread(name='CommandDispatchThread', 
target=self.__command_dispatch,
+                                               daemon=True)
+            __command_dispatch_thread.start()
+
+            self.__snapshot_queue = 
Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
+
+            __query_profile_thread = Thread(name='QueryProfileCommandThread', 
target=self.__query_profile_command,
+                                            daemon=True)
+            __query_profile_thread.start()
+
+            __send_profile_thread = Thread(name='SendProfileSnapShotThread', 
target=self.__send_profile_snapshot,
+                                           daemon=True)
+            __send_profile_thread.start()
+
+    @staticmethod  # for now
+    def __fork_before() -> None:
+        """
+        This handles explicit fork() calls. The child process will not have a 
running thread, so we need to
+        revive all of them. The parent process will continue to run as normal.
+
+        This does not affect pre-forking server support, which are handled 
separately.
+        """
+        # possible deadlock would be introduced if some queue is in use when 
fork() is called and
+        # therefore child process will inherit a locked queue. To avoid this 
and have side benefit
+        # of a clean queue in child process (prevent duplicated reporting), we 
simply restart the agent and
+        # reinitialize all queues and threads.
+        logger.warning('SkyWalking Python agent fork support is currently 
experimental, '
+                       'please report issues if you encounter any.')
+
+    @staticmethod  # for now
+    def __fork_after_in_parent() -> None:
+        """
+        Something to do after fork() in parent process
+        """
+        ...
+
+    def __fork_after_in_child(self) -> None:
+        """
+        Simply restart the agent after we detect a fork() call
+        """
+        self.start()
+        logger.info('SkyWalking Python agent spawned in child after fork() 
call.')
+
+    def start(self) -> None:
+        """
+        Start would be called by user or os.register_at_fork() callback
+        Start will proceed if and only if the agent is not started in the
+        current process.
+
+        When os.fork(), the service instance should be changed to a new one by 
appending pid.
+        """
+        python_version: tuple = sys.version_info[:2]
+        if python_version[0] < 3 and python_version[1] < 7:

Review Comment:
   <picture><img alt="21% of developers fix this issue" 
src="https://lift.sonatype.com/api/commentimage/fixrate/21/display.svg";></picture>
   
   <b>*Unsupported operand:</b>*  `<` is not supported for operand types 
`typing.Union[int, str]` and `int`.
   
   ---
   
   <details><summary>ℹī¸ Expand to see all <b>@sonatype-lift</b> 
commands</summary>
   
   You can reply with the following commands. For example, reply with 
***@sonatype-lift ignoreall*** to leave out all findings.
   | **Command** | **Usage** |
   | ------------- | ------------- |
   | `@sonatype-lift ignore` | Leave out the above finding from this PR |
   | `@sonatype-lift ignoreall` | Leave out all the existing findings from this 
PR |
   | `@sonatype-lift exclude <file\|issue\|path\|tool>` | Exclude specified 
`file\|issue\|path\|tool` from Lift findings by updating your config.toml file |
   
   **Note:** When talking to LiftBot, you need to **refresh** the page to see 
its response.
   <sub>[Click here](https://github.com/apps/sonatype-lift/installations/new) 
to add LiftBot to another repo.</sub></details>
   
   
   
   ---
   
   <b>Help us improve LIFT! (<i>Sonatype LiftBot external survey</i>)</b>
   
   Was this a good recommendation for you? <sub><small>Answering this survey 
will not impact your Lift settings.</small></sub>
   
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=5)
 ]



##########
skywalking/agent/__init__.py:
##########
@@ -16,269 +16,353 @@
 #
 
 import atexit
+import functools
+import os
+import sys
 from queue import Queue, Full
 from threading import Thread, Event
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
 
 from skywalking import config, plugins
 from skywalking import loggings
-from skywalking import profile
 from skywalking import meter
+from skywalking import profile
 from skywalking.agent.protocol import Protocol
 from skywalking.command import command_service
 from skywalking.loggings import logger
 from skywalking.profile.profile_task import ProfileTask
 from skywalking.profile.snapshot import TracingThreadSnapshot
-from skywalking.protocol.logging.Logging_pb2 import LogData
 from skywalking.protocol.language_agent.Meter_pb2 import MeterData
+from skywalking.protocol.logging.Logging_pb2 import LogData
+from skywalking.utils.singleton import Singleton
 
 if TYPE_CHECKING:
     from skywalking.trace.context import Segment
 
-__started = False
-__protocol = None  # type: Protocol
-__heartbeat_thread = __report_thread = __log_report_thread = 
__query_profile_thread = __command_dispatch_thread \
-    = __send_profile_thread = __queue = __log_queue = __snapshot_queue = 
__meter_queue = __finished = None
-
-
-def __heartbeat():
-    wait = base = 30
-
-    while not __finished.is_set():
-        try:
-            __protocol.heartbeat()
-            wait = base  # reset to base wait time on success
-        except Exception as exc:
-            logger.error(str(exc))
-            wait = min(60, wait * 2 or 1)  # double wait time with each 
consecutive error up to a maximum
-
-        __finished.wait(wait)
-
 
-def __report():
-    wait = base = 0
-
-    while not __finished.is_set():
+def report_with_backoff(reporter_name, init_wait):
+    """
+    An exponential backoff for retrying reporters.
+    """
+
+    def backoff_decorator(func):
+        @functools.wraps(func)
+        def backoff_wrapper(self, *args, **kwargs):
+            wait = base = init_wait
+            while not self._finished.is_set():
+                try:
+                    func(self, *args, **kwargs)
+                    wait = base  # reset to base wait time on success
+                except Exception:  # noqa
+                    wait = min(60, wait * 2 or 1)  # double wait time with 
each consecutive error up to a maximum
+                    logger.exception(f'Exception in {reporter_name} service in 
pid {os.getpid()}, '
+                                     f'retry in {wait} seconds')
+                self._finished.wait(wait)
+            logger.info('finished reporter thread')
+        return backoff_wrapper
+    return backoff_decorator
+
+
+class SkyWalkingAgent(Singleton):
+    """
+    The main singleton class and entrypoint of SkyWalking Python Agent.
+    Upon fork(), original instance rebuild everything (queues, threads, 
instrumentation) by
+    calling the fork handlers in the class instance.
+    """
+    __started: bool = False  # shared by all instances
+
+    def __init__(self):
+        """
+        Protocol is one of gRPC, HTTP and Kafka that
+        provides clients to reporters to communicate with OAP backend.
+        """
+        self.started_pid = None
+        self.__protocol: Optional[Protocol] = None
+        self._finished: Optional[Event] = None
+
+    def __bootstrap(self):
+        # when forking, already instrumented modules must not be instrumented 
again
+        # otherwise it will cause double instrumentation! (we should provide 
an un-instrument method)
+        if config.agent_protocol == 'grpc':
+            from skywalking.agent.protocol.grpc import GrpcProtocol
+            self.__protocol = GrpcProtocol()
+        elif config.agent_protocol == 'http':
+            from skywalking.agent.protocol.http import HttpProtocol
+            self.__protocol = HttpProtocol()
+        elif config.agent_protocol == 'kafka':
+            from skywalking.agent.protocol.kafka import KafkaProtocol
+            self.__protocol = KafkaProtocol()
+
+        # Initialize queues for segment, log, meter and profiling snapshots
+        self.__segment_queue: Optional[Queue] = None
+        self.__log_queue: Optional[Queue] = None
+        self.__meter_queue: Optional[Queue] = None
+        self.__snapshot_queue: Optional[Queue] = None
+
+        # Start reporter threads and register queues
+        self.__init_threading()
+
+    def __init_threading(self) -> None:
+        """
+        This method initializes all the queues and threads for the agent and 
reporters.
+        Upon os.fork(), callback will reinitialize threads and queues by 
calling this method
+
+        Heartbeat thread is started by default.
+        Segment reporter thread and segment queue is created by default.
+        All other queues and threads depends on user configuration.
+        """
+        self._finished = Event()
+
+        __heartbeat_thread = Thread(name='HeartbeatThread', 
target=self.__heartbeat, daemon=True)
+        __heartbeat_thread.start()
+
+        self.__segment_queue = 
Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
+        __segment_report_thread = Thread(name='SegmentReportThread', 
target=self.__report_segment, daemon=True)
+        __segment_report_thread.start()
+
+        if config.agent_meter_reporter_active:
+            self.__meter_queue = 
Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
+            __meter_report_thread = Thread(name='MeterReportThread', 
target=self.__report_meter, daemon=True)
+            __meter_report_thread.start()
+
+            if config.agent_pvm_meter_reporter_active:
+                from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource
+                from skywalking.meter.pvm.gc_data import GCDataSource
+                from skywalking.meter.pvm.mem_usage import MEMUsageDataSource
+                from skywalking.meter.pvm.thread_data import ThreadDataSource
+
+                MEMUsageDataSource().register()
+                CPUUsageDataSource().register()
+                GCDataSource().register()
+                ThreadDataSource().register()
+
+        if config.agent_log_reporter_active:
+            self.__log_queue = 
Queue(maxsize=config.agent_log_reporter_max_buffer_size)
+            __log_report_thread = Thread(name='LogReportThread', 
target=self.__report_log, daemon=True)
+            __log_report_thread.start()
+
+        if config.agent_profile_active:
+            # Now only profiler receives commands from OAP
+            __command_dispatch_thread = Thread(name='CommandDispatchThread', 
target=self.__command_dispatch,
+                                               daemon=True)
+            __command_dispatch_thread.start()
+
+            self.__snapshot_queue = 
Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
+
+            __query_profile_thread = Thread(name='QueryProfileCommandThread', 
target=self.__query_profile_command,
+                                            daemon=True)
+            __query_profile_thread.start()
+
+            __send_profile_thread = Thread(name='SendProfileSnapShotThread', 
target=self.__send_profile_snapshot,
+                                           daemon=True)
+            __send_profile_thread.start()
+
+    @staticmethod  # for now
+    def __fork_before() -> None:
+        """
+        This handles explicit fork() calls. The child process will not have a 
running thread, so we need to
+        revive all of them. The parent process will continue to run as normal.
+
+        This does not affect pre-forking server support, which are handled 
separately.
+        """
+        # possible deadlock would be introduced if some queue is in use when 
fork() is called and
+        # therefore child process will inherit a locked queue. To avoid this 
and have side benefit
+        # of a clean queue in child process (prevent duplicated reporting), we 
simply restart the agent and
+        # reinitialize all queues and threads.
+        logger.warning('SkyWalking Python agent fork support is currently 
experimental, '
+                       'please report issues if you encounter any.')
+
+    @staticmethod  # for now
+    def __fork_after_in_parent() -> None:
+        """
+        Something to do after fork() in parent process
+        """
+        ...
+
+    def __fork_after_in_child(self) -> None:
+        """
+        Simply restart the agent after we detect a fork() call
+        """
+        self.start()
+        logger.info('SkyWalking Python agent spawned in child after fork() 
call.')
+
+    def start(self) -> None:
+        """
+        Start would be called by user or os.register_at_fork() callback
+        Start will proceed if and only if the agent is not started in the
+        current process.
+
+        When os.fork(), the service instance should be changed to a new one by 
appending pid.
+        """
+        python_version: tuple = sys.version_info[:2]
+        if python_version[0] < 3 and python_version[1] < 7:

Review Comment:
   <picture><img alt="21% of developers fix this issue" 
src="https://lift.sonatype.com/api/commentimage/fixrate/21/display.svg";></picture>
   
   <b>*Unsupported operand:</b>*  `<` is not supported for operand types 
`typing.Union[int, str]` and `int`.
   
   ---
   
   <details><summary>ℹī¸ Expand to see all <b>@sonatype-lift</b> 
commands</summary>
   
   You can reply with the following commands. For example, reply with 
***@sonatype-lift ignoreall*** to leave out all findings.
   | **Command** | **Usage** |
   | ------------- | ------------- |
   | `@sonatype-lift ignore` | Leave out the above finding from this PR |
   | `@sonatype-lift ignoreall` | Leave out all the existing findings from this 
PR |
   | `@sonatype-lift exclude <file\|issue\|path\|tool>` | Exclude specified 
`file\|issue\|path\|tool` from Lift findings by updating your config.toml file |
   
   **Note:** When talking to LiftBot, you need to **refresh** the page to see 
its response.
   <sub>[Click here](https://github.com/apps/sonatype-lift/installations/new) 
to add LiftBot to another repo.</sub></details>
   
   
   
   ---
   
   <b>Help us improve LIFT! (<i>Sonatype LiftBot external survey</i>)</b>
   
   Was this a good recommendation for you? <sub><small>Answering this survey 
will not impact your Lift settings.</small></sub>
   
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=5)
 ]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to