deku0818 opened a new issue, #1191:
URL: https://github.com/apache/rocketmq-clients/issues/1191

   ### Before Creating the Bug Report
   
   - [x] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions).
   
   - [x] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq-clients/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [x] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Programming Language of the Client
   
   Python
   
   ### Runtime Platform Environment
   
   OS: Ubuntu 22.04 (Linux 6.14.0)
   
   ### RocketMQ Version of the Client/Server
   
   - Client: rocketmq-python-client 5.1.0
   - Server: Apache RocketMQ 5.x 
   
   ### Run or Compiler Version
   
   - Python: 3.11
   - protobuf: 6.33.5
   - grpcio: 1.76.0
   
   ### Describe the Bug
   
   
   `PushConsumer.startup()` hangs indefinitely when the broker responds with 
`metric.on=False` in the telemetry settings.
   
   The root cause is that `PushConsumer.reset_metric()` throws an unhandled 
`AttributeError` (because `ClientMetrics.__meter_provider` is `None` when 
metrics are disabled). This exception kills the telemetry stream read loop in 
`RpcStreamStreamCall.start_stream_read()`, preventing `reset_setting()` from 
ever being called. Since `reset_setting()` is the only method that calls 
`_init_settings_event.set()`, `startup()` blocks forever on 
`_init_settings_event.wait()`.
   
   ### Steps to Reproduce
   
   1. Deploy a RocketMQ 5.x broker with metric collection disabled (this is the 
default).
   2. Run the following code:
   
   ```python
   from rocketmq import (
       ClientConfiguration, Credentials, FilterExpression,
       PushConsumer, MessageListener, ConsumeResult, Message,
   )
   
   class MyListener(MessageListener):
       def consume(self, message: Message) -> ConsumeResult:
           return ConsumeResult.SUCCESS
   
   config = ClientConfiguration("broker-ip:port", Credentials())
   consumer = PushConsumer(
       config, "my-consumer-group", MyListener(),
       {"my-topic": FilterExpression()},
   )
   consumer.startup()  # Hangs forever
   ```
   
   ### What Did You Expect to See?
   
   
   `startup()` completes successfully and the consumer begins receiving 
messages.
   
   ### What Did You See Instead?
   
   `startup()` blocks forever. Thread dump shows it is stuck on 
`_init_settings_event.wait()` in `client.py:79`.
   
   
   ### Additional Context
   
   
   ### Detailed call chain analysis
   
   The bug involves three files:
   
   **1. `rpc_channel.py` — `RpcStreamStreamCall.start_stream_read()` (line 
136-140)**
   
   ```python
   if res.settings and res.settings.metric:
       self.__handler.reset_metric(res.settings.metric)   # ← crashes here
       self.__handler.reset_setting(res.settings)          # ← never reached
   ```
   
   `res.settings.metric` is a protobuf message object, which is **always 
truthy** in Python regardless of its field values. So this block is always 
entered.
   
   `reset_metric()` is called **before** `reset_setting()`. When 
`reset_metric()` throws an exception, the outer `except Exception` clause (line 
155) catches it and the stream read loop terminates. `reset_setting()` is never 
called.
   
   **2. `push_consumer.py` — `PushConsumer.reset_metric()`**
   
   ```python
   def reset_metric(self, metric):
       super().reset_metric(metric)                    # returns early when 
metric.on=False
       self.__register_process_queues_gauges()          # ← crashes here
   ```
   
   **3. `client_metrics.py` — 
`ClientMetrics.create_push_consumer_process_queue_observable_gauge()`**
   
   ```python
   def create_push_consumer_process_queue_observable_gauge(self, name, 
callback_func):
       meter = self.__meter_provider.get_meter(ClientMetrics.METRIC_NAME)
       #      ^^^^^^^^^^^^^^^^^^^^ __meter_provider is None!
       #      → AttributeError: 'NoneType' object has no attribute 'get_meter'
   ```
   
   `__meter_provider` is `None` because `ClientMetrics.reset_metrics()` 
returned early — `__satisfy()` returns `True` when `not self.__enabled and not 
metric.on`, so `__meter_provider_start()` was never called.
   
   **Full chain of events:**
   
   ```
   Broker responds with metric.on=False
     → start_stream_read() enters "if res.settings.metric" block (always truthy)
       → PushConsumer.reset_metric(metric)
         → ClientMetrics.reset_metrics(metric)
           → __satisfy(metric) returns True (metrics disabled)
           → returns early, __meter_provider remains None
         → __register_process_queues_gauges()
           → create_push_consumer_process_queue_observable_gauge()
             → self.__meter_provider.get_meter()  →  AttributeError!
       → Exception propagates to start_stream_read()
       → Caught by "except Exception", stream read loop ends
       → reset_setting() is NEVER called
         → _init_settings_event is NEVER set
           → startup() blocks forever on _init_settings_event.wait()
   ```
   
   ### Suggested Fix
   
   Two issues should be addressed:
   
   **Fix 1: Guard `__meter_provider` access in `client_metrics.py`**
   
   `create_push_consumer_process_queue_observable_gauge()` should return early 
when metrics are not enabled:
   
   ```python
   def create_push_consumer_process_queue_observable_gauge(self, name, 
callback_func):
       if not self.__meter_provider:
           return
       # ... rest of method unchanged
   ```
   
   **Fix 2: Ensure `reset_setting()` always executes in `rpc_channel.py`**
   
   `reset_setting()` must not depend on `reset_metric()` succeeding. It should 
be called first, and `reset_metric()` should be wrapped in try/except:
   
   ```python
   if res.settings:
       self.__handler.reset_setting(res.settings)
       if res.settings.HasField("metric"):
           try:
               self.__handler.reset_metric(res.settings.metric)
           except Exception:
               logger.warning("reset_metric failed, ignored")
   ```
   
   ### Workaround
   
   Subclass `PushConsumer` at the application level:
   
   ```python
   class SafePushConsumer(PushConsumer):
       def reset_metric(self, metric):
           try:
               super().reset_metric(metric)
           except AttributeError:
               pass
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to