kamalcph commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1290329055
########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig, } partitionsToDelete += topicPartition } + if (stopPartition.deleteRemoteLog) + remotePartitionsToDelete += topicPartition + // If we were the leader, we may have some operations still waiting for completion. // We force completion to prevent them from timing out. completeDelayedFetchOrProduceRequests(topicPartition) } // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() + val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] { + override def accept(tp: TopicPartition, e: Throwable): Unit = { + error(s"Error while stopping/deleting the remote log partition: $tp", e) + errorMap.put(tp, e) + } + } + if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint. logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e)) } + remoteLogManager.foreach(rlm => { + // exclude the partitions with offline/error state + errorMap.keySet.foreach(remotePartitionsToDelete.remove) Review Comment: We are deleting the local logs first. If any error encountered while deleting the local log segments, we are removing that partition from `remotePartitionsToDelete` set. The assumption made here is if there is an error, then we will retry the operation again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org