[ https://issues.apache.org/jira/browse/KAFKA-19523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kamal Chandraprakash updated KAFKA-19523: ----------------------------------------- Description: Improve the error handling while building the remote-log-auxiliary state when a follower node with an empty disk begin to synchronise with the leader. If the topic has remote storage enabled, then the ReplicaFetcherThread attempt to build the remote-log-auxiliary state. Note that the remote-log-auxiliary state gets invoked only when the leader-log-start-offset is non-zero and leader-log-start-offset is not equal to leader-local-log-start-offset. When the LeaderAndISR request is received, then the ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially, followed by the RemoteLogManager#onLeadershipChange call. As a result, when ReplicaFetcherThread initiates the RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have been initialized at that time. Introduce RetriableRemoteStorageException to gracefully handle the error. stacktrace: {code} [2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2, fetcherId=0] Error building remote log auxiliary state for orange-0 (kafka.server.ReplicaFetcherThread) java.lang.IllegalStateException: This instance is in invalid state, initialized: false close: false at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569) ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221) ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] at org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606) ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] at kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.TierStateMachine.start(TierStateMachine.java:114) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at scala.Option.foreach(Option.scala:437) ~[scala-library-2.13.16.jar:?] at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430) ~[scala-library-2.13.16.jar:?] at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426) ~[scala-library-2.13.16.jar:?] at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344) ~[scala-library-2.13.16.jar:?] at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?] at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136) [kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117) [kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96) [kafka_2.13-4.2.0-SNAPSHOT.jar:?] at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) [kafka-server-common-4.2.0-SNAPSHOT.jar:?] {code} The exception gets thrown repeatedly until the RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a retriable error so we have to handle it gracefully. was: Improve the error handling while building the remote-log-auxiliary state when a follower node with an empty disk begin to synchronise with the leader. If the topic has remote storage enabled, then the ReplicaFetcherThread attempt to build the remote-log-auxiliary state. Note that the remote-log-auxiliary state gets invoked only when the leader-log-start-offset is non-zero and leader-log-start-offset is not equal to leader-local-log-start-offset. When the LeaderAndISR request is received, then the ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially, followed by the RemoteLogManager#onLeadershipChange call. As a result, when ReplicaFetcherThread initiates the RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have been initialized at that time. Introducing a new RetriableRemoteStorageException requires a KIP as it is a public API change, so wrap the IllegalStateException in RemoteStorageException to gracefully handle the error. stacktrace: {code} [2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2, fetcherId=0] Error building remote log auxiliary state for orange-0 (kafka.server.ReplicaFetcherThread) java.lang.IllegalStateException: This instance is in invalid state, initialized: false close: false at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569) ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221) ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] at org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606) ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] at kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.TierStateMachine.start(TierStateMachine.java:114) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at scala.Option.foreach(Option.scala:437) ~[scala-library-2.13.16.jar:?] at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430) ~[scala-library-2.13.16.jar:?] at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426) ~[scala-library-2.13.16.jar:?] at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344) ~[scala-library-2.13.16.jar:?] at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137) ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?] at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136) [kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117) [kafka_2.13-4.2.0-SNAPSHOT.jar:?] at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96) [kafka_2.13-4.2.0-SNAPSHOT.jar:?] at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) [kafka-server-common-4.2.0-SNAPSHOT.jar:?] {code} The exception gets thrown repeatedly until the RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a retriable error so we have to handle it gracefully. > Gracefully handle error while building remoteLogAuxState > -------------------------------------------------------- > > Key: KAFKA-19523 > URL: https://issues.apache.org/jira/browse/KAFKA-19523 > Project: Kafka > Issue Type: Task > Reporter: Kamal Chandraprakash > Assignee: Kamal Chandraprakash > Priority: Major > > Improve the error handling while building the remote-log-auxiliary state when > a follower node with an empty disk begin to synchronise with the leader. If > the topic has remote storage enabled, then the ReplicaFetcherThread attempt > to build the remote-log-auxiliary state. Note that the remote-log-auxiliary > state gets invoked only when the leader-log-start-offset is non-zero and > leader-log-start-offset is not equal to leader-local-log-start-offset. > When the LeaderAndISR request is received, then the > ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially, > followed by the RemoteLogManager#onLeadershipChange call. As a result, when > ReplicaFetcherThread initiates the > RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have > been initialized at that time. > Introduce RetriableRemoteStorageException to gracefully handle the error. > stacktrace: > {code} > [2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2, > fetcherId=0] Error building remote log auxiliary state for orange-0 > (kafka.server.ReplicaFetcherThread) > java.lang.IllegalStateException: This instance is in invalid state, > initialized: false close: false > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569) > ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221) > ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606) > ~[kafka-storage-4.2.0-SNAPSHOT.jar:?] > at > kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233) > ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at kafka.server.TierStateMachine.start(TierStateMachine.java:114) > ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at > kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785) > ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434) > ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at scala.Option.foreach(Option.scala:437) > ~[scala-library-2.13.16.jar:?] > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342) > ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341) > ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430) > ~[scala-library-2.13.16.jar:?] > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426) > ~[scala-library-2.13.16.jar:?] > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344) > ~[scala-library-2.13.16.jar:?] > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341) > ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137) > ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?] > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136) > [kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117) > [kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96) > [kafka_2.13-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136) > [kafka-server-common-4.2.0-SNAPSHOT.jar:?] > {code} > The exception gets thrown repeatedly until the > RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a > retriable error so we have to handle it gracefully. -- This message was sent by Atlassian Jira (v8.20.10#820010)