wuguowei1994 opened a new pull request, #18750:
URL: https://github.com/apache/druid/pull/18750

   # Motivation
   
   We operate a Druid deployment with more than 500 nodes.
   
   In real-time ingestion scenarios, a monitoring process queries the cluster 
every minute to retrieve the `ingest/kafka/partitionLag` metric. If the lag 
remains unhealthy for more than five minutes, alerts are triggered.
   
   In our production environment, this metric periodically becomes 
**negative**, even when the cluster is fully healthy. These false alerts create 
unnecessary operational load and frequently wake the on-call team during 
off-hours. On the other hand, we cannot suppress negative-lag alerts entirely, 
since in some situations negative lag can indicate real ingestion problems.
   
   For a large-scale, 24×7 real-time ingestion pipeline, **accurate and 
consistent lag metrics are essential to avoid unnecessary nighttime wake-ups 
while still ensuring that real issues are detected promptly**.
   
   
   ---
   
   # Problem Description
   
   <img width="1255" height="839" alt="negative_lag" 
src="https://github.com/user-attachments/assets/3b193544-984d-4d62-9387-06d0100b79f7";
 />
   
   
   In the current implementation, the Druid supervisor maintains two volatile 
data structures:
   
   * The latest Kafka `end_offset` for each partition
   * The latest task-reported `current_offset` for each partition
   
   The supervisor periodically updates these values (every 30 seconds):
   
   1. Querying all tasks in parallel  to update `current_offset`.
      This step waits for all HTTP requests to complete and each request has a 
timeout of two minutes.
   2. Querying Kafka cluster to refresh `end_offset`.
   
   On the other hand, a separate periodic task (every minute) computes:
   
   ```
   lag = end_offset - current_offset
   ```
   
   Because the two updates are not atomic, intermediate inconsistent states may 
occur.
   
   ### Intermediate State Leading to Negative Lag
   
   If one task becomes heavily loaded or experiences other delays during Step 
1, it may take significantly longer to return its offset. In this situation, 
the supervisor continues waiting for that slow task while the other tasks have 
already responded. 
   
   During this waiting period:
   
   * Many `current_offset` values already have been updated to new values.
   * The `end_offset` values remain stale because Step 2 has not executed yet.
   
   If a monitoring request arrives in this intermediate window, the supervisor 
computes lag using:
   
   * **Partially updated `current_offset`**
   * **Stale `end_offset`**
   
   This produces negative lag values.
   
   This issue repeats as long as at least one task remains slow. Large clusters 
with many partitions and many Kafka-indexing tasks are more likely to 
experience this scenario.
   
   ---
   
   # Example Scenario
   
   1. Initial state: `end_offset = 10000`, `current_offset = 0`.
   2. After consumption: latest Kafka `end_offset = 30000`, and all tasks have 
consumed up to `20000`.
   3. During Step 1, 49 tasks respond quickly, and their `current_offset` is 
updated to `20000`.
      One task is slow, causing Step 1 to remain in the awaiting state.
   4. The in-memory `end_offset` stays at the old value `10000`.
   5. If a metric query occurs at this point, the supervisor calculates:
   
      ```
      10000 - 20000 = -10000
      ```
   6. Because the periodic update logic repeats, this situation can persist 
across multiple cycles.
   
   ---
   
   # Proposed Changes
   
   Replace the two volatile structures storing `current_offset` and 
`end_offset` with `AtomicReference` containers that hold both values as a 
single immutable state object. The supervisor will update these references as 
atomic units, ensuring that lag computation always observes a consistent 
snapshot.
   
   This eliminates inconsistent intermediate states and prevents negative lag 
due to partial updates.
   
   ---
   
   # Rationale
   
   * Ensures consistent reads between related fields.
   * Avoids breaking existing APIs or supervisor behavior.
   * The change is localized to offset bookkeeping logic.
   * No behavioral changes other than removing negative lag caused by 
inconsistent state.
   
   ---
   
   # Operational Impact
   
   * No configuration changes required.
   * No backward incompatibility.
   * Improved accuracy of Kafka lag metrics in large clusters.
   * Reduces false alerts in monitoring systems.
   
   ---
   
   # Test Plan
   
   * This change does not add new feature. We only need to make sure existing 
tests still pass. 
   * All current tests pass successfully.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to