sonatype-lift[bot] commented on code in PR #287: URL: https://github.com/apache/skywalking-python/pull/287#discussion_r1108048061
########## skywalking/agent/__init__.py: ########## @@ -16,269 +16,353 @@ # import atexit +import functools +import os +import sys from queue import Queue, Full from threading import Thread, Event -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from skywalking import config, plugins from skywalking import loggings -from skywalking import profile from skywalking import meter +from skywalking import profile from skywalking.agent.protocol import Protocol from skywalking.command import command_service from skywalking.loggings import logger from skywalking.profile.profile_task import ProfileTask from skywalking.profile.snapshot import TracingThreadSnapshot -from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.protocol.language_agent.Meter_pb2 import MeterData +from skywalking.protocol.logging.Logging_pb2 import LogData +from skywalking.utils.singleton import Singleton if TYPE_CHECKING: from skywalking.trace.context import Segment -__started = False -__protocol = None # type: Protocol -__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \ - = __send_profile_thread = __queue = __log_queue = __snapshot_queue = __meter_queue = __finished = None - - -def __heartbeat(): - wait = base = 30 - - while not __finished.is_set(): - try: - __protocol.heartbeat() - wait = base # reset to base wait time on success - except Exception as exc: - logger.error(str(exc)) - wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum - - __finished.wait(wait) - -def __report(): - wait = base = 0 - - while not __finished.is_set(): +def report_with_backoff(reporter_name, init_wait): + """ + An exponential backoff for retrying reporters. + """ + + def backoff_decorator(func): + @functools.wraps(func) + def backoff_wrapper(self, *args, **kwargs): + wait = base = init_wait + while not self._finished.is_set(): + try: + func(self, *args, **kwargs) + wait = base # reset to base wait time on success + except Exception: # noqa + wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum + logger.exception(f'Exception in {reporter_name} service in pid {os.getpid()}, ' + f'retry in {wait} seconds') + self._finished.wait(wait) + logger.info('finished reporter thread') + return backoff_wrapper + return backoff_decorator + + +class SkyWalkingAgent(Singleton): + """ + The main singleton class and entrypoint of SkyWalking Python Agent. + Upon fork(), original instance rebuild everything (queues, threads, instrumentation) by + calling the fork handlers in the class instance. + """ + __started: bool = False # shared by all instances + + def __init__(self): + """ + Protocol is one of gRPC, HTTP and Kafka that + provides clients to reporters to communicate with OAP backend. + """ + self.started_pid = None + self.__protocol: Optional[Protocol] = None + self._finished: Optional[Event] = None + + def __bootstrap(self): + # when forking, already instrumented modules must not be instrumented again + # otherwise it will cause double instrumentation! (we should provide an un-instrument method) + if config.agent_protocol == 'grpc': + from skywalking.agent.protocol.grpc import GrpcProtocol + self.__protocol = GrpcProtocol() + elif config.agent_protocol == 'http': + from skywalking.agent.protocol.http import HttpProtocol + self.__protocol = HttpProtocol() + elif config.agent_protocol == 'kafka': + from skywalking.agent.protocol.kafka import KafkaProtocol + self.__protocol = KafkaProtocol() + + # Initialize queues for segment, log, meter and profiling snapshots + self.__segment_queue: Optional[Queue] = None + self.__log_queue: Optional[Queue] = None + self.__meter_queue: Optional[Queue] = None + self.__snapshot_queue: Optional[Queue] = None + + # Start reporter threads and register queues + self.__init_threading() + + def __init_threading(self) -> None: + """ + This method initializes all the queues and threads for the agent and reporters. + Upon os.fork(), callback will reinitialize threads and queues by calling this method + + Heartbeat thread is started by default. + Segment reporter thread and segment queue is created by default. + All other queues and threads depends on user configuration. + """ + self._finished = Event() + + __heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True) + __heartbeat_thread.start() + + self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size) + __segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True) + __segment_report_thread.start() + + if config.agent_meter_reporter_active: + self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size) + __meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True) + __meter_report_thread.start() + + if config.agent_pvm_meter_reporter_active: + from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource + from skywalking.meter.pvm.gc_data import GCDataSource + from skywalking.meter.pvm.mem_usage import MEMUsageDataSource + from skywalking.meter.pvm.thread_data import ThreadDataSource + + MEMUsageDataSource().register() + CPUUsageDataSource().register() + GCDataSource().register() + ThreadDataSource().register() + + if config.agent_log_reporter_active: + self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size) + __log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True) + __log_report_thread.start() + + if config.agent_profile_active: + # Now only profiler receives commands from OAP + __command_dispatch_thread = Thread(name='CommandDispatchThread', target=self.__command_dispatch, + daemon=True) + __command_dispatch_thread.start() + + self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size) + + __query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command, + daemon=True) + __query_profile_thread.start() + + __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=self.__send_profile_snapshot, + daemon=True) + __send_profile_thread.start() + + @staticmethod # for now + def __fork_before() -> None: + """ + This handles explicit fork() calls. The child process will not have a running thread, so we need to + revive all of them. The parent process will continue to run as normal. + + This does not affect pre-forking server support, which are handled separately. + """ + # possible deadlock would be introduced if some queue is in use when fork() is called and + # therefore child process will inherit a locked queue. To avoid this and have side benefit + # of a clean queue in child process (prevent duplicated reporting), we simply restart the agent and + # reinitialize all queues and threads. + logger.warning('SkyWalking Python agent fork support is currently experimental, ' + 'please report issues if you encounter any.') + + @staticmethod # for now + def __fork_after_in_parent() -> None: + """ + Something to do after fork() in parent process + """ + ... + + def __fork_after_in_child(self) -> None: + """ + Simply restart the agent after we detect a fork() call + """ + self.start() + logger.info('SkyWalking Python agent spawned in child after fork() call.') + + def start(self) -> None: + """ + Start would be called by user or os.register_at_fork() callback + Start will proceed if and only if the agent is not started in the + current process. + + When os.fork(), the service instance should be changed to a new one by appending pid. + """ + python_version: tuple = sys.version_info[:2] + if python_version[0] < 3 and python_version[1] < 7: Review Comment: <picture><img alt="21% of developers fix this issue" src="https://lift.sonatype.com/api/commentimage/fixrate/21/display.svg"></picture> <b>*Unsupported operand:</b>* `<` is not supported for operand types `typing.Union[int, str]` and `int`. --- <details><summary>âšī¸ Expand to see all <b>@sonatype-lift</b> commands</summary> You can reply with the following commands. For example, reply with ***@sonatype-lift ignoreall*** to leave out all findings. | **Command** | **Usage** | | ------------- | ------------- | | `@sonatype-lift ignore` | Leave out the above finding from this PR | | `@sonatype-lift ignoreall` | Leave out all the existing findings from this PR | | `@sonatype-lift exclude <file\|issue\|path\|tool>` | Exclude specified `file\|issue\|path\|tool` from Lift findings by updating your config.toml file | **Note:** When talking to LiftBot, you need to **refresh** the page to see its response. <sub>[Click here](https://github.com/apps/sonatype-lift/installations/new) to add LiftBot to another repo.</sub></details> --- <b>Help us improve LIFT! (<i>Sonatype LiftBot external survey</i>)</b> Was this a good recommendation for you? <sub><small>Answering this survey will not impact your Lift settings.</small></sub> [â[đ Not relevant](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=1) â] - [â[đ Won't fix](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=2) â] - [â[đ Not critical, will fix](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=3) â] - [â[đ Critical, will fix](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=4) â] - [â[đ Critical, fixing now](https://www.sonatype.com/lift-comment-rating?comment=392497579&lift_comment_rating=5) â] ########## skywalking/agent/__init__.py: ########## @@ -16,269 +16,353 @@ # import atexit +import functools +import os +import sys from queue import Queue, Full from threading import Thread, Event -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from skywalking import config, plugins from skywalking import loggings -from skywalking import profile from skywalking import meter +from skywalking import profile from skywalking.agent.protocol import Protocol from skywalking.command import command_service from skywalking.loggings import logger from skywalking.profile.profile_task import ProfileTask from skywalking.profile.snapshot import TracingThreadSnapshot -from skywalking.protocol.logging.Logging_pb2 import LogData from skywalking.protocol.language_agent.Meter_pb2 import MeterData +from skywalking.protocol.logging.Logging_pb2 import LogData +from skywalking.utils.singleton import Singleton if TYPE_CHECKING: from skywalking.trace.context import Segment -__started = False -__protocol = None # type: Protocol -__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \ - = __send_profile_thread = __queue = __log_queue = __snapshot_queue = __meter_queue = __finished = None - - -def __heartbeat(): - wait = base = 30 - - while not __finished.is_set(): - try: - __protocol.heartbeat() - wait = base # reset to base wait time on success - except Exception as exc: - logger.error(str(exc)) - wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum - - __finished.wait(wait) - -def __report(): - wait = base = 0 - - while not __finished.is_set(): +def report_with_backoff(reporter_name, init_wait): + """ + An exponential backoff for retrying reporters. + """ + + def backoff_decorator(func): + @functools.wraps(func) + def backoff_wrapper(self, *args, **kwargs): + wait = base = init_wait + while not self._finished.is_set(): + try: + func(self, *args, **kwargs) + wait = base # reset to base wait time on success + except Exception: # noqa + wait = min(60, wait * 2 or 1) # double wait time with each consecutive error up to a maximum + logger.exception(f'Exception in {reporter_name} service in pid {os.getpid()}, ' + f'retry in {wait} seconds') + self._finished.wait(wait) + logger.info('finished reporter thread') + return backoff_wrapper + return backoff_decorator + + +class SkyWalkingAgent(Singleton): + """ + The main singleton class and entrypoint of SkyWalking Python Agent. + Upon fork(), original instance rebuild everything (queues, threads, instrumentation) by + calling the fork handlers in the class instance. + """ + __started: bool = False # shared by all instances + + def __init__(self): + """ + Protocol is one of gRPC, HTTP and Kafka that + provides clients to reporters to communicate with OAP backend. + """ + self.started_pid = None + self.__protocol: Optional[Protocol] = None + self._finished: Optional[Event] = None + + def __bootstrap(self): + # when forking, already instrumented modules must not be instrumented again + # otherwise it will cause double instrumentation! (we should provide an un-instrument method) + if config.agent_protocol == 'grpc': + from skywalking.agent.protocol.grpc import GrpcProtocol + self.__protocol = GrpcProtocol() + elif config.agent_protocol == 'http': + from skywalking.agent.protocol.http import HttpProtocol + self.__protocol = HttpProtocol() + elif config.agent_protocol == 'kafka': + from skywalking.agent.protocol.kafka import KafkaProtocol + self.__protocol = KafkaProtocol() + + # Initialize queues for segment, log, meter and profiling snapshots + self.__segment_queue: Optional[Queue] = None + self.__log_queue: Optional[Queue] = None + self.__meter_queue: Optional[Queue] = None + self.__snapshot_queue: Optional[Queue] = None + + # Start reporter threads and register queues + self.__init_threading() + + def __init_threading(self) -> None: + """ + This method initializes all the queues and threads for the agent and reporters. + Upon os.fork(), callback will reinitialize threads and queues by calling this method + + Heartbeat thread is started by default. + Segment reporter thread and segment queue is created by default. + All other queues and threads depends on user configuration. + """ + self._finished = Event() + + __heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True) + __heartbeat_thread.start() + + self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size) + __segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True) + __segment_report_thread.start() + + if config.agent_meter_reporter_active: + self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size) + __meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True) + __meter_report_thread.start() + + if config.agent_pvm_meter_reporter_active: + from skywalking.meter.pvm.cpu_usage import CPUUsageDataSource + from skywalking.meter.pvm.gc_data import GCDataSource + from skywalking.meter.pvm.mem_usage import MEMUsageDataSource + from skywalking.meter.pvm.thread_data import ThreadDataSource + + MEMUsageDataSource().register() + CPUUsageDataSource().register() + GCDataSource().register() + ThreadDataSource().register() + + if config.agent_log_reporter_active: + self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size) + __log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True) + __log_report_thread.start() + + if config.agent_profile_active: + # Now only profiler receives commands from OAP + __command_dispatch_thread = Thread(name='CommandDispatchThread', target=self.__command_dispatch, + daemon=True) + __command_dispatch_thread.start() + + self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size) + + __query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command, + daemon=True) + __query_profile_thread.start() + + __send_profile_thread = Thread(name='SendProfileSnapShotThread', target=self.__send_profile_snapshot, + daemon=True) + __send_profile_thread.start() + + @staticmethod # for now + def __fork_before() -> None: + """ + This handles explicit fork() calls. The child process will not have a running thread, so we need to + revive all of them. The parent process will continue to run as normal. + + This does not affect pre-forking server support, which are handled separately. + """ + # possible deadlock would be introduced if some queue is in use when fork() is called and + # therefore child process will inherit a locked queue. To avoid this and have side benefit + # of a clean queue in child process (prevent duplicated reporting), we simply restart the agent and + # reinitialize all queues and threads. + logger.warning('SkyWalking Python agent fork support is currently experimental, ' + 'please report issues if you encounter any.') + + @staticmethod # for now + def __fork_after_in_parent() -> None: + """ + Something to do after fork() in parent process + """ + ... + + def __fork_after_in_child(self) -> None: + """ + Simply restart the agent after we detect a fork() call + """ + self.start() + logger.info('SkyWalking Python agent spawned in child after fork() call.') + + def start(self) -> None: + """ + Start would be called by user or os.register_at_fork() callback + Start will proceed if and only if the agent is not started in the + current process. + + When os.fork(), the service instance should be changed to a new one by appending pid. + """ + python_version: tuple = sys.version_info[:2] + if python_version[0] < 3 and python_version[1] < 7: Review Comment: <picture><img alt="21% of developers fix this issue" src="https://lift.sonatype.com/api/commentimage/fixrate/21/display.svg"></picture> <b>*Unsupported operand:</b>* `<` is not supported for operand types `typing.Union[int, str]` and `int`. --- <details><summary>âšī¸ Expand to see all <b>@sonatype-lift</b> commands</summary> You can reply with the following commands. For example, reply with ***@sonatype-lift ignoreall*** to leave out all findings. | **Command** | **Usage** | | ------------- | ------------- | | `@sonatype-lift ignore` | Leave out the above finding from this PR | | `@sonatype-lift ignoreall` | Leave out all the existing findings from this PR | | `@sonatype-lift exclude <file\|issue\|path\|tool>` | Exclude specified `file\|issue\|path\|tool` from Lift findings by updating your config.toml file | **Note:** When talking to LiftBot, you need to **refresh** the page to see its response. <sub>[Click here](https://github.com/apps/sonatype-lift/installations/new) to add LiftBot to another repo.</sub></details> --- <b>Help us improve LIFT! (<i>Sonatype LiftBot external survey</i>)</b> Was this a good recommendation for you? <sub><small>Answering this survey will not impact your Lift settings.</small></sub> [â[đ Not relevant](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=1) â] - [â[đ Won't fix](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=2) â] - [â[đ Not critical, will fix](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=3) â] - [â[đ Critical, will fix](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=4) â] - [â[đ Critical, fixing now](https://www.sonatype.com/lift-comment-rating?comment=392497580&lift_comment_rating=5) â] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org