[ 
https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anatolii Popov updated KAFKA-16105:
-----------------------------------
    Labels: tiered-storage  (was: )

> Reassignment of tiered topics is failing due to RemoteStorageException
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-16105
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16105
>             Project: Kafka
>          Issue Type: Bug
>          Components: Tiered-Storage
>            Reporter: Anatolii Popov
>            Priority: Critical
>              Labels: tiered-storage
>
> When partition reassignment is happening for a tiered topic in most of the 
> cases it's stuck with RemoteStorageException's on follower nodes saying that 
> it can not construct remote log auxilary state:
>  
> {code:java}
> [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
> fetcherId=2] Error building remote log auxiliary state for test-24 
> (kafka.server.ReplicaFetcherThread)
>                                          
> org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
> build the state from remote store for partition: test-24, currentLeaderEpoch: 
> 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
> previous remote log segment metadata was not found
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
>                                                  at 
> kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
>                                                  at 
> kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
>                                                  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
>                                                  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
>                                                  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
>                                                  at 
> scala.Option.foreach(Option.scala:437)
>                                                  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
>                                                  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
>                                                  at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>                                                  at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
>  {code}
>  
> Scenario:
> A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
> have tiered segments.
> Adding 3 more nodes to the cluster and making a reassignment to move all the 
> data to new nodes.
> Behavior:
> For most of the partitions reassignment is happening smoothly.
> For some of the partitions when a new node starts to get assignments it reads 
> __remote_log_metadata topic and tries to initialize the metadata cache on 
> records with COPY_SEGMENT_STARTED. If it's reading such a message for the 
> partition before the partition was assigned to this specific node it ignores 
> the message, so skips the cache initialization and marks the partition as 
> assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never 
> properly processed.
> Expected behavior:
> The partitions should not be marked as assigned before the cache is 
> initialized to be able to re-read COPY_SEGMENT_STARTED message and initialize 
> the cache.
>  
> Some notes:
> This is most probably happening when there are messages in a single metadata 
> partition and the order of the messages does not correspond to the order of 
> assignment. So the follower reads the COPY_SEGMENT_STARTED message, sees that 
> the user partition is not yet assigned to this node, skips the message, and 
> marks the user partition as assigned. On the next iteration, it resets to 
> beginning ONLY the metadata partitions for user partitions that were not yet 
> assigned, the rest of the metadata partitions it reads from the offset it 
> remembers from the previous step, and that for some reason does not help. So 
> that skipped COPY_SEGMENT_STARTED seems to be never re-read, so the metadata 
> cache is not initialized.
> One solution can be not to track the last read offset for the metadata 
> partition but instead, always reset them to the beginning if reassignment has 
> changed. This seems not to bring any harm since the messages that were 
> processed before will be just skipped. And this seems to allow the 
> reassignment to finish properly.
> Draft PR https://github.com/apache/kafka/pull/15165



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to