[ 
https://issues.apache.org/jira/browse/KAFKA-19523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-19523:
-----------------------------------------
    Description: 
Improve the error handling while building the remote-log-auxiliary state when a 
follower node with an empty disk begin to synchronise with the leader. If the 
topic has remote storage enabled, then the ReplicaFetcherThread attempt to 
build the remote-log-auxiliary state. Note that the remote-log-auxiliary state 
gets invoked only when the leader-log-start-offset is non-zero and 
leader-log-start-offset is not equal to leader-local-log-start-offset.

When the LeaderAndISR request is received, then the 
ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially, 
followed by the RemoteLogManager#onLeadershipChange call. As a result, when 
ReplicaFetcherThread initiates the 
RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have been 
initialized at that time.

Introduce RetriableRemoteStorageException to gracefully handle the error.

stacktrace: 
{code}
[2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2, 
fetcherId=0] Error building remote log auxiliary state for orange-0 
(kafka.server.ReplicaFetcherThread)
java.lang.IllegalStateException: This instance is in invalid state, 
initialized: false close: false
        at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569)
 ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
        at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221)
 ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
        at 
org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606)
 ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233) 
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at kafka.server.TierStateMachine.start(TierStateMachine.java:114) 
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at scala.Option.foreach(Option.scala:437) ~[scala-library-2.13.16.jar:?]
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430)
 ~[scala-library-2.13.16.jar:?]
        at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426)
 ~[scala-library-2.13.16.jar:?]
        at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344)
 ~[scala-library-2.13.16.jar:?]
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?]
        at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136) 
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117) 
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96) 
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
 [kafka-server-common-4.2.0-SNAPSHOT.jar:?]
{code}

The exception gets thrown repeatedly until the 
RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a 
retriable error so we have to handle it gracefully. 

  was:
Improve the error handling while building the remote-log-auxiliary state when a 
follower node with an empty disk begin to synchronise with the leader. If the 
topic has remote storage enabled, then the ReplicaFetcherThread attempt to 
build the remote-log-auxiliary state. Note that the remote-log-auxiliary state 
gets invoked only when the leader-log-start-offset is non-zero and 
leader-log-start-offset is not equal to leader-local-log-start-offset.

When the LeaderAndISR request is received, then the 
ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially, 
followed by the RemoteLogManager#onLeadershipChange call. As a result, when 
ReplicaFetcherThread initiates the 
RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have been 
initialized at that time.

Introducing a new RetriableRemoteStorageException requires a KIP as it is a 
public API change, so wrap the IllegalStateException in RemoteStorageException 
to gracefully handle the error.

stacktrace: 
{code}
[2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2, 
fetcherId=0] Error building remote log auxiliary state for orange-0 
(kafka.server.ReplicaFetcherThread)
java.lang.IllegalStateException: This instance is in invalid state, 
initialized: false close: false
        at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569)
 ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
        at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221)
 ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
        at 
org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606)
 ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233) 
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at kafka.server.TierStateMachine.start(TierStateMachine.java:114) 
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at scala.Option.foreach(Option.scala:437) ~[scala-library-2.13.16.jar:?]
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430)
 ~[scala-library-2.13.16.jar:?]
        at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426)
 ~[scala-library-2.13.16.jar:?]
        at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344)
 ~[scala-library-2.13.16.jar:?]
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
 ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?]
        at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136) 
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117) 
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96) 
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
        at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
 [kafka-server-common-4.2.0-SNAPSHOT.jar:?]
{code}

The exception gets thrown repeatedly until the 
RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a 
retriable error so we have to handle it gracefully. 


> Gracefully handle error while building remoteLogAuxState
> --------------------------------------------------------
>
>                 Key: KAFKA-19523
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19523
>             Project: Kafka
>          Issue Type: Task
>            Reporter: Kamal Chandraprakash
>            Assignee: Kamal Chandraprakash
>            Priority: Major
>
> Improve the error handling while building the remote-log-auxiliary state when 
> a follower node with an empty disk begin to synchronise with the leader. If 
> the topic has remote storage enabled, then the ReplicaFetcherThread attempt 
> to build the remote-log-auxiliary state. Note that the remote-log-auxiliary 
> state gets invoked only when the leader-log-start-offset is non-zero and 
> leader-log-start-offset is not equal to leader-local-log-start-offset.
> When the LeaderAndISR request is received, then the 
> ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially, 
> followed by the RemoteLogManager#onLeadershipChange call. As a result, when 
> ReplicaFetcherThread initiates the 
> RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have 
> been initialized at that time.
> Introduce RetriableRemoteStorageException to gracefully handle the error.
> stacktrace: 
> {code}
> [2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error building remote log auxiliary state for orange-0 
> (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalStateException: This instance is in invalid state, 
> initialized: false close: false
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569)
>  ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221)
>  ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606)
>  ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
>         at 
> kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233)
>  ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at kafka.server.TierStateMachine.start(TierStateMachine.java:114) 
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785)
>  ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434)
>  ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at scala.Option.foreach(Option.scala:437) 
> ~[scala-library-2.13.16.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342)
>  ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341)
>  ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430)
>  ~[scala-library-2.13.16.jar:?]
>         at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426)
>  ~[scala-library-2.13.16.jar:?]
>         at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344)
>  ~[scala-library-2.13.16.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341)
>  ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
>  ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?]
>         at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
>  [kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117) 
> [kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96) 
> [kafka_2.13-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
>  [kafka-server-common-4.2.0-SNAPSHOT.jar:?]
> {code}
> The exception gets thrown repeatedly until the 
> RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a 
> retriable error so we have to handle it gracefully. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to