[ 
https://issues.apache.org/jira/browse/KAFKA-19902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041015#comment-18041015
 ] 

RivenSun commented on KAFKA-19902:
----------------------------------

Hi [~lianetm] ,
Thank you for your detailed explanation! I understand the points you've raised, 
but I'd like to further clarify the actual scenario we encountered, as I 
believe there may be some key details that weren't clearly expressed in my 
initial report.
h3. What I Understand From Your Response

First, let me confirm what I understand:
 # ✅ OFFSET_OUT_OF_RANGE occurs on the fetch path, not the commit path - Yes, I 
understand this. The error is triggered when the consumer attempts to fetch, 
not when committing offsets.

 # ✅ Resetting based on auto.offset.reset is expected behavior - Yes, I 
understand this is the intended behavior when OUT_OF_RANGE occurs.

 # ✅ allConsumed() should use position epoch for truncation detection - I 
understand the current design intention.

h3. However, Our Actual Scenario Is Different

The scenario we encountered differs from the "segments deleted by retention" 
case you described. Here are the critical observations:Timeline of Events:
 # Consumer was actively consuming with no lag (not starting fresh, not 
recovering from a long pause)

 # Rebalance occurred at time T

 # Producer logged NotLeaderOrFollowerException around the same time 
(indicating leader change)

 # Consumer immediately hit OFFSET_OUT_OF_RANGE after rebalance completed

 # ❗️Critical: We found NO log segment deletion logs in broker logs for this 
topic

Key Evidence:
 * ❌ NOT a case of aggressive retention: We thoroughly checked broker logs - no 
segment deletion occurred for this topic at this time

 * ❌ NOT a case of stale consumer position: The consumer had no lag and was 
actively consuming

 * ✅ Correlation with leader change: Producer errors 
(NotLeaderOrFollowerException) occurred in the same timeframe

 * ✅ Timing: Error occurred immediately after rebalance, not during normal 
steady-state consumption

h3. The Root Cause I Suspect

Here's what I believe actually happened (and what I'm trying to confirm/fix):
 
Time T1: Consumer consuming normally from Leader A (epoch=N)
    - Consumer position: offset=1000, offsetEpoch=N, currentLeader.epoch=N
    - Leader: Broker A, epoch=N
    - Consumer continues normal consumption, position updates continuously
 
Time T2: Leader change occurs (Leader A → Leader B)
    - New Leader: Broker B, epoch=N+1
    - Producer detects this change (logs NotLeaderOrFollowerException)
    - Consumer's metadata will eventually update to reflect new leader
    - ⚠️ BUT consumer's position.currentLeader is NOT updated during normal 
consumption
         (it only updates when errors are detected or explicit validation 
occurs)
 
Time T3: Rebalance is triggered
    - Consumer commits offset in onPartitionsRevoked()
    - allConsumed() uses: offset=1000, leaderEpoch=N (from offsetEpoch)
    - ⚠️ Problem: Committing with epoch=N, but the actual current leader is 
epoch=N+1
    - Committed offset persisted to broker: {offset: 1000, leaderEpoch: N}
 
Time T4: Rebalance completes, consumer resumes
    - Fetches committed offset from broker: offset=1000, epoch=N
    - Tries to validate position using OffsetsForLeaderEpoch API
    - Sends validation request: leaderEpoch=N, currentLeaderEpoch=N+1
    - ⚠️ Broker attempts to look up offset for epoch=N
    - ⚠️ Broker may fail to validate epoch=N (not because segments were deleted,
         but because after epoch transition, the offset range info for old 
epoch 
         becomes unavailable or invalid)
 
Time T5: Broker returns OFFSET_OUT_OF_RANGE (or validation failure)
    - Consumer triggers reset to earliest
    - Result: Reprocessing massive amounts of historical data
 
h3. My Core Question

What I'm trying to confirm is: Is this a bug that needs fixing, or is it 
expected edge-case behavior?Specific Concerns:
 # In a leader change scenario, if the committed offset uses the old 
offsetEpoch (from when the data was consumed) rather than the current leader 
epoch from metadata, can this cause validation to fail?

 # FetchCollector inherits the currentLeader field from the old position when 
updating, and does NOT update it during normal consumption. This means even if 
metadata knows about the new leader, position.currentLeader may still be stale. 
The code is here:
{code:java}
// FetchCollector.java
   SubscriptionState.FetchPosition nextPosition = new 
SubscriptionState.FetchPosition(
       nextInLineFetch.nextFetchOffset(),
       nextInLineFetch.lastEpoch(),    // offsetEpoch: from batch
       position.currentLeader);         // inherited, not updated! {code}

 # allConsumed() uses offsetEpoch rather than currentLeader.epoch for commit. 
In a leader change scenario, offsetEpoch comes from already-consumed batches 
(old leader's epoch), while currentLeader.epoch should reflect the current 
leader. But due to point #2, currentLeader may also be stale.

h3. My Questions

Please help me understand these points:A. How does the broker handle 
OffsetsForLeaderEpoch validation requests with an old epoch after a leader 
change?
 * If the request has leaderEpoch=N, but the current leader is epoch=N+1

 * What response does the broker return? Is it OFFSET_OUT_OF_RANGE or another 
error?

B. Is this expected behavior?
 * In an actively consuming consumer

 * Simply because a leader change happened during rebalance

 * Causing the consumer to reset to earliest and reprocess massive amounts of 
data

 * Is this a reasonable design?

C. If this is NOT expected behavior, what's the fix direction?
 * Should we update currentLeader when FetchCollector updates position?

 * Should we change which epoch allConsumed() uses?

 * Or is there a better solution?

h3. Additional Information

I can provide:
 * Detailed timeline logs

 * Consumer and Producer configurations

 * The exact error stack traces we observed

If needed, I can also attempt to reproduce this scenario in a test environment 
(triggering leader change + rebalance combination).Looking forward to your 
response!
Best regards,
RivenSun

> Consumer triggers OFFSET_OUT_OF_RANGE when committed offset uses stale epoch 
> after leader change
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19902
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19902
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.9.0
>            Reporter: RivenSun
>            Priority: Major
>         Attachments: image-2025-11-21-18-03-24-592.png, 
> image-2025-11-21-18-11-24-316.png, image-2025-11-21-20-00-10-862.png
>
>
> h2. Summary
> When a partition leader changes and the consumer commits offsets with a stale 
> epoch, if the log segments containing that epoch are subsequently deleted due 
> to retention policy, the consumer will encounter OFFSET_OUT_OF_RANGE error 
> and reset to earliest (if auto.offset.reset=earliest), causing massive 
> message reprocessing.The root cause is that SubscriptionState.allConsumed() 
> uses position.offsetEpoch instead of position.currentLeader.epoch when 
> constructing OffsetAndMetadata for commit, which can become stale when leader 
> changes occur.
> If `auto.offset.reset=latest`, *the consequences will be more severe, 
> resulting in message loss.*
> ----
> h2. Environment
> Cluster Configuration:
>  * Kafka Server Version: 3.9.0
>  * Kafka Client Version: 3.9.0
>  * Topic: 200 partitions, 7-day retention, no tiered storage
>  * Consumer Group: 45 consumers (1 KafkaConsumer thread per machine)
>  * No broker/controller restarts occurred
>  * High throughput producer continuously writing messages
> Consumer Configuration:
> {code:java}
> auto.offset.reset=earliest
> enable.auto.commit=true {code}
>  
> Consumer Code:
>  * Registered ConsumerRebalanceListener
>  * Calls kafkaConsumer.commitSync() in onPartitionsRevoked() method
> ----
> h2. Problem Description
> In a scenario where the consumer group has no lag, consumers suddenly 
> consumed a massive amount of messages, far exceeding the recent few minutes 
> of producer writes. Investigation revealed that multiple partitions reset to 
> the earliest offset and reprocessed up to 7 days of historical data.
> ----
> h2. Observed Symptoms (Timeline)
>  # Consumer group rebalance occurred (triggered by normal consumer group 
> management)
>  # Consumer logged OFFSET_OUT_OF_RANGE errors immediately after rebalance
>  # Consumer reset to earliest offset due to auto.offset.reset=earliest 
> configuration
>  # Kafka broker logged NotLeaderOrFollowerException around the same 
> timeframe, indicating partition leader changes
>  # Consumer did not log any NOT_LEADER_OR_FOLLOWER errors (these are DEBUG 
> level and not visible in production logs)
> The image below uses the partition 
> asyncmq_local_us_us_marketplace-ssl-a_aws_us-east-1_2a7e053c-9d90-4efd-af2d-3a8bf9564715-153
>  as an example to trace the problem log chain. {*}See attached log screenshot 
> at the bottom{*}.
> ----
> h2. Root Cause Analysis
> h3. The Problem Chain
> 1. Leader change occurs (epoch changes from N to N+1)
>    ↓
> 2. Consumer continues processing old batches (epoch=N)
>    ↓
> 3. Consumer commits offset during/after rebalance
>    ├─ Committed offset: 1000
>    └─ Committed epoch: N (using position.offsetEpoch from old batch)
>    ↓
> 4. High throughput + retention policy causes old segments (epoch=N) to be 
> deleted
>    ↓
> 5. Consumer restarts/rebalances and fetches committed offset
>    ├─ Tries to validate offset 1000 with epoch=N
>    └─ Broker cannot find epoch=N ({*}segments deleted or tp leader change{*})
>    ↓
> 6. Broker returns OFFSET_OUT_OF_RANGE
>    ↓
> 7. Consumer resets to earliest offset
> h3. Code Analysis
> The problematic code in SubscriptionState.allConsumed():
> {code:java}
> // 
> kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition())
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 partitionState.position.offsetEpoch,  // Problem: uses 
> offsetEpoch from consumed batch
>                 ""));
>     });
>     return allConsumed;
> } {code}
>  
> Why this is problematic:The FetchPosition class contains two different epoch 
> values:
>  * offsetEpoch: The epoch from the last consumed record's batch
>  * currentLeader.epoch: The current partition leader's epoch from metadata
> When committing offsets, we should use currentLeader.epoch instead of 
> offsetEpoch because:
>  # offsetEpoch represents the epoch of already consumed data (historical)
>  # currentLeader.epoch represents the current partition leader (up-to-date)
> h3. Scenarios Where These Epochs Diverge
> Scenario A: Leader changes while consumer is processing old batches
>  * T1: Consumer fetches batch with epoch=5
>  * T2: Leader changes to epoch=6
>  * T3: Metadata updates with new leader epoch=6
>  * T4: Consumer commits offset
>  * offsetEpoch = 5 (from batch being processed)
>  * currentLeader.epoch = 6 (from updated metadata)
>  * Problem: Commits epoch=5, which may soon be deleted
> Scenario B: Recovery from committed offset after leader change
>  * Consumer commits offset with old epoch=N
>  * Leader changes to epoch=N+1
>  * Old segments (epoch=N) are deleted by retention policy
>  * Consumer rebalances and tries to restore from committed offset
>  * offsetEpoch = N (from committed offset)
>  * currentLeader.epoch = N+1 (from current metadata)
>  * Problem: Validation fails because epoch=N no longer exists
> ----
> h2. Steps to Reproduce
> This is a timing-sensitive edge case. The following conditions increase the 
> likelihood:
>  # Setup:
>  * High-throughput topic (to trigger faster log rotation)
>  * Relatively short retention period (e.g., 7 days)
>  * Consumer group with rebalance listener calling commitSync()
>  * enable.auto.commit=true (or any manual commit)
>  # Trigger:
>  * Trigger a partition leader change (broker restart, controller election, 
> etc.)
>  * Simultaneously or shortly after, trigger a consumer group rebalance
>  * Wait for retention policy to delete old log segments
>  # Expected Result:
> Consumer should resume from committed offset
>  # Actual Result:
> Consumer encounters OFFSET_OUT_OF_RANGE and resets to earliest
> ----
> h2. Impact
>  * Data Reprocessing: Consumers may reprocess up to retention.ms worth of data
>  * Service Degradation: Sudden spike in consumer throughput can overwhelm 
> downstream systems
>  * Resource Waste: Unnecessary CPU, memory, and network usage
>  * Potential Duplicates: If using auto.offset.reset=earliest, duplicate 
> message processing is guaranteed
>  * If `auto.offset.reset=latest`, *the consequences will be more severe, 
> resulting in message loss.*
> ----
> h2. Proposed Fix
> h3. Root Cause Analysis
> The issue is more fundamental than a simple field selection problem. The core 
> issue is that both epoch values in FetchPosition can be stale at commit time:
>  # offsetEpoch: Contains the epoch from the last consumed record's batch. If 
> a leader change occurs after consumption but before commit, this epoch 
> becomes stale and may reference log segments that have been deleted.
>  # currentLeader.epoch: Inherited from the previous position during normal 
> consumption and only updated when:
>  * NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH errors are detected
>  * Position is restored from committed offsets (fetches from metadata)
>  * Explicit validation is triggered via 
> maybeValidatePositionForCurrentLeader()
> During normal, error-free consumption, currentLeader is never updated and can 
> also become stale.
> h3. Problem with Current Code
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector 
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
>     SubscriptionState.FetchPosition nextPosition = new 
> SubscriptionState.FetchPosition(
>             nextInLineFetch.nextFetchOffset(),
>             nextInLineFetch.lastEpoch(),    // offsetEpoch: from consumed 
> batch
>             position.currentLeader);         // ❌ currentLeader: inherited, 
> NOT updated!
>     log.trace("Updating fetch position from {} to {} for partition {} and 
> returning {} records from `poll()`",
>             position, nextPosition, tp, partRecords.size());
>     subscriptions.position(tp, nextPosition);
>     positionAdvanced = true;
> }
> {code}
> The inherited currentLeader means it can be as stale as offsetEpoch in 
> certain scenarios.
> ----
> h3. Recommended Solution: Proactively Update currentLeader During Position 
> Updates
> Option 1: Update currentLeader when advancing position (Primary 
> Recommendation)Modify FetchCollector to fetch the latest leader information 
> from metadata every time the position is updated:
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
>     // Fetch the latest leader information from metadata
>     Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
> metadata.currentLeader(tp);
>     
>     SubscriptionState.FetchPosition nextPosition = new 
> SubscriptionState.FetchPosition(
>             nextInLineFetch.nextFetchOffset(),
>             nextInLineFetch.lastEpoch(),
>             currentLeaderAndEpoch);  // ✅ Use fresh leader info from metadata
>     
>     log.trace("Updating fetch position from {} to {} for partition {} and 
> returning {} records from `poll()`",
>             position, nextPosition, tp, partRecords.size());
>     subscriptions.position(tp, nextPosition);
>     positionAdvanced = true;
> } {code}
> Advantages:
>  * Ensures currentLeader is always up-to-date
>  * Makes allConsumed() safe to use *currentLeader.epoch* for commits
> Modify SubscriptionState.allConsumed() to {color:#de350b}use 
> currentLeader.epoch instead of offsetEpoch{color}:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition())
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 partitionState.position.currentLeader.epoch,  // ✅ Use 
> current leader epoch
>                 ""));
>     });
>     return allConsumed;
> } {code}
>  
>  * Minimal performance impact (metadata lookup is O(1) from local cache)
>  * Aligns with the existing pattern in refreshCommittedOffsets()
> Potential Concerns:
>  * Adds one metadata lookup per position update
>  * If metadata is stale, currentLeader.epoch could still lag slightly, but 
> this is the same risk as today
> ----
> h3. Alternative Solutions
> Option 2: Fetch fresh leader info during commitModify allConsumed() to fetch 
> the latest leader information at commit time:
> {code:java}
> // Note: This would require passing metadata reference to allConsumed()
> public synchronized Map<TopicPartition, OffsetAndMetadata> 
> allConsumed(ConsumerMetadata metadata) {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition()) {
>             // Fetch the latest leader epoch from metadata at commit time
>             Metadata.LeaderAndEpoch latestLeader = 
> metadata.currentLeader(topicPartition);
>             Optional<Integer> epochToCommit = latestLeader.epoch.isPresent() 
>                 ? latestLeader.epoch 
>                 : partitionState.position.offsetEpoch;  // Fallback to 
> offsetEpoch
>             
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 epochToCommit,
>                 ""));
>         }
>     });
>     return allConsumed;
> } {code}
> Advantages:
>  * Only impacts commit path, not consumption hot path
>  * Directly addresses the commit-time staleness issue
> Disadvantages:
>  * Requires changing the signature of allConsumed() (API change)
>  * May still have a race condition if leader changes between metadata fetch 
> and commit
>  * Metadata could be stale if update hasn't been processed yet
> ----
> Option 3: Use the maximum epoch valueUse the larger of the two epoch values, 
> assuming newer epochs have higher values:
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition()) {
>             Optional<Integer> epochToCommit;
>             
>             if (partitionState.position.offsetEpoch.isPresent() && 
>                 partitionState.position.currentLeader.epoch.isPresent()) {
>                 // Use the maximum of the two epochs
>                 int maxEpoch = Math.max(
>                     partitionState.position.offsetEpoch.get(),
>                     partitionState.position.currentLeader.epoch.get());
>                 epochToCommit = Optional.of(maxEpoch);
>             } else {
>                 // Fallback to whichever is present
>                 epochToCommit = 
> partitionState.position.currentLeader.epoch.isPresent()
>                     ? partitionState.position.currentLeader.epoch
>                     : partitionState.position.offsetEpoch;
>             }
>             
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 epochToCommit,
>                 ""));
>         }
>     });
>     return allConsumed;
> } {code}
> Advantages:
>  * No API changes required
>  * Simple to implement
>  * Provides better protection than using only one epoch
> Disadvantages:
>  * Heuristic-based; assumes epochs are monotonically increasing
>  * Could still use a stale epoch if both values are old
>  * Doesn't solve the root cause of stale currentLeader
> ----
> h3. Recommendation
> Primary recommendation: Implement Option 1 (Update currentLeader during 
> position updates)This is the most robust solution because:
>  # It ensures currentLeader is always fresh
>  # It fixes the root cause rather than working around symptoms
>  # It has minimal performance impact
>  # It makes the codebase more consistent and maintainable
> Secondary recommendation: Implement Option 3 as a defense-in-depth 
> measureEven with Option 1, using max(offsetEpoch, currentLeader.epoch) in 
> allConsumed() provides additional safety against any edge cases where one 
> epoch might be more up-to-date than the other.Combined approach (strongest 
> protection):
> {code:java}
> // In FetchCollector.java
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
>     Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
> metadata.currentLeader(tp);
>     SubscriptionState.FetchPosition nextPosition = new 
> SubscriptionState.FetchPosition(
>             nextInLineFetch.nextFetchOffset(),
>             nextInLineFetch.lastEpoch(),
>             currentLeaderAndEpoch);  // ✅ Keep currentLeader fresh
>     subscriptions.position(tp, nextPosition);
>     positionAdvanced = true;
> }
>  
> // In SubscriptionState.java
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition()) {
>             // Use the maximum epoch as defense-in-depth
>             Optional<Integer> epochToCommit;
>             if (partitionState.position.offsetEpoch.isPresent() && 
>                 partitionState.position.currentLeader.epoch.isPresent()) {
>                 epochToCommit = Optional.of(Math.max(
>                     partitionState.position.offsetEpoch.get(),
>                     partitionState.position.currentLeader.epoch.get()));
>             } else {
>                 epochToCommit = 
> partitionState.position.currentLeader.epoch.isPresent()
>                     ? partitionState.position.currentLeader.epoch
>                     : partitionState.position.offsetEpoch;
>             }
>             
>             allConsumed.put(topicPartition, new OffsetAndMetadata(
>                 partitionState.position.offset,
>                 epochToCommit,
>                 ""));
>         }
>     });
>     return allConsumed;
> } {code}
> This combined approach provides:
>  * Prevention: Keep currentLeader fresh during normal operation
>  * Defense: Use the best available epoch value at commit time
>  * Resilience: Minimize the window where a stale epoch can cause issues
> ----
> h2. Additional Notes
> Why consumers don't log NOT_LEADER_OR_FOLLOWER errors:All consumer-side 
> handling of NOT_LEADER_OR_FOLLOWER errors uses DEBUG level logging:
> {code:java}
> // FetchCollector.java line 325
> log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
>  
> // AbstractFetch.java line 207
> log.debug("For {}, received error {}, with leaderIdAndEpoch {}", partition, 
> partitionError, ...);
>  
> // OffsetsForLeaderEpochUtils.java line 102
> LOG.debug("Attempt to fetch offsets for partition {} failed due to {}, 
> retrying.", ...); {code}
>  
> This makes the issue difficult to diagnose in production environments.
> ----
> h2. Workarounds (Until Fixed)
>  # Increase retention period to reduce likelihood of epoch deletion
>  # Monitor consumer lag to ensure it stays low
>  # Reduce rebalance frequency (increase max.poll.interval.ms, 
> session.timeout.ms)
>  # Use cooperative rebalance strategy to minimize rebalance impact
>  # Consider using auto.offset.reset=latest if reprocessing is more costly 
> than data loss (application-dependent)
> ----
> h2. Related Code References
> h3. 1. The problematic method: SubscriptionState.allConsumed()
> Location: org.apache.kafka.clients.consumer.internals.SubscriptionState 
> {code:java}
> public synchronized Map<TopicPartition, OffsetAndMetadata> allConsumed() {
>     Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
>     assignment.forEach((topicPartition, partitionState) -> {
>         if (partitionState.hasValidPosition())
>             allConsumed.put(topicPartition, new 
> OffsetAndMetadata(partitionState.position.offset,
>                     partitionState.position.offsetEpoch, ""));  // Uses 
> offsetEpoch instead of currentLeader.epoch
>     });
>     return allConsumed;
> } {code}
> h3. 2. How FetchPosition is updated during normal consumption
> Location: org.apache.kafka.clients.consumer.internals.FetchCollector 
> {code:java}
> if (nextInLineFetch.nextFetchOffset() > position.offset) {
>     SubscriptionState.FetchPosition nextPosition = new 
> SubscriptionState.FetchPosition(
>             nextInLineFetch.nextFetchOffset(),
>             nextInLineFetch.lastEpoch(),    // offsetEpoch: from consumed 
> batch
>             position.currentLeader);         // currentLeader: inherited from 
> old position, NOT updated!
>     log.trace("Updating fetch position from {} to {} for partition {} and 
> returning {} records from `poll()`",
>             position, nextPosition, tp, partRecords.size());
>     subscriptions.position(tp, nextPosition);
>     positionAdvanced = true;
> } {code}
> Key Issue: The currentLeader field is inherited from the previous position 
> and not automatically updated during normal consumption. It only gets updated 
> when leader change errors are detected.
> h3. 3. How committed offsets are restored after rebalance
> Location: 
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets()
> {code:java}
> public static void refreshCommittedOffsets(final Map<TopicPartition, 
> OffsetAndMetadata> offsetsAndMetadata,
>                                            final ConsumerMetadata metadata,
>                                            final SubscriptionState 
> subscriptions) {
>     for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
> offsetsAndMetadata.entrySet()) {
>         final TopicPartition tp = entry.getKey();
>         final OffsetAndMetadata offsetAndMetadata = entry.getValue();
>         if (offsetAndMetadata != null) {
>             // first update the epoch if necessary
>             entry.getValue().leaderEpoch().ifPresent(epoch -> 
> metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
>  
>             // it's possible that the partition is no longer assigned when 
> the response is received,
>             // so we need to ignore seeking if that's the case
>             if (subscriptions.isAssigned(tp)) {
>                 final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = 
> metadata.currentLeader(tp);
>                 final SubscriptionState.FetchPosition position = new 
> SubscriptionState.FetchPosition(
>                         offsetAndMetadata.offset(), 
>                         offsetAndMetadata.leaderEpoch(),  // offsetEpoch from 
> committed offset (may be old)
>                         leaderAndEpoch);                   // currentLeader 
> from current metadata (may be new)
>  
>                 subscriptions.seekUnvalidated(tp, position);
>  
>                 log.info("Setting offset for partition {} to the committed 
> offset {}", tp, position);
>             }
>         }
>     }
> } {code}
> The Divergence Point: When restoring from committed offsets, offsetEpoch 
> comes from the stored offset (potentially old), while currentLeader comes 
> from fresh metadata (potentially new after leader change).
> h3. 4. How OffsetsForLeaderEpoch validation request is constructed
> Location: 
> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils.prepareRequest()
> {code:java}
> static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
>         Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
>     OffsetForLeaderTopicCollection topics = new 
> OffsetForLeaderTopicCollection(requestData.size());
>     requestData.forEach((topicPartition, fetchPosition) ->
>             fetchPosition.offsetEpoch.ifPresent(fetchEpoch -> {
>                 OffsetForLeaderTopic topic = 
> topics.find(topicPartition.topic());
>                 if (topic == null) {
>                     topic = new 
> OffsetForLeaderTopic().setTopic(topicPartition.topic());
>                     topics.add(topic);
>                 }
>                 topic.partitions().add(new OffsetForLeaderPartition()
>                         .setPartition(topicPartition.partition())
>                         .setLeaderEpoch(fetchEpoch)              // Uses 
> offsetEpoch for validation
>                         
> .setCurrentLeaderEpoch(fetchPosition.currentLeader.epoch
>                                 
> .orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
>                 );
>             })
>     );
>     return OffsetsForLeaderEpochRequest.Builder.forConsumer(topics);
> } {code}
> The Validation Problem: The validation request uses fetchEpoch (which is 
> offsetEpoch) to validate against the broker. If this epoch no longer exists 
> in the broker's log, validation fails and triggers OFFSET_OUT_OF_RANGE.
> h3. 5. FetchPosition class definition
> Location: 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition 
>  
> {code:java}
> /**
>  * Represents the position of a partition subscription.
>  *
>  * This includes the offset and epoch from the last record in
>  * the batch from a FetchResponse. It also includes the leader epoch at the 
> time the batch was consumed.
>  */
> public static class FetchPosition {
>     public final long offset;
>     final Optional<Integer> offsetEpoch;        // Epoch from last consumed 
> record's batch
>     final Metadata.LeaderAndEpoch currentLeader; // Current partition leader 
> info from metadata
>  
>     FetchPosition(long offset) {
>         this(offset, Optional.empty(), 
> Metadata.LeaderAndEpoch.noLeaderOrEpoch());
>     }
>  
>     public FetchPosition(long offset, Optional<Integer> offsetEpoch, 
> Metadata.LeaderAndEpoch currentLeader) {
>         this.offset = offset;
>         this.offsetEpoch = Objects.requireNonNull(offsetEpoch);
>         this.currentLeader = Objects.requireNonNull(currentLeader);
>     }
>  
>     @Override
>     public String toString() {
>         return "FetchPosition{" +
>                 "offset=" + offset +
>                 ", offsetEpoch=" + offsetEpoch +
>                 ", currentLeader=" + currentLeader +
>                 '}';
>     }
> }{code}
> Class Design: The class contains both offsetEpoch (historical data epoch) and 
> currentLeader.epoch (current metadata epoch), but allConsumed() only uses the 
> former when committing.
> h1. Attachements logs:
> h2. group rebalance log
> !image-2025-11-21-18-03-24-592.png!
> h2. consumer client log
> !image-2025-11-21-20-00-10-862.png!
> h2. broker server log
> !image-2025-11-21-18-11-24-316.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to