kamalcph commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1290329055


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
         }
         partitionsToDelete += topicPartition
       }
+      if (stopPartition.deleteRemoteLog)
+        remotePartitionsToDelete += topicPartition
+
       // If we were the leader, we may have some operations still waiting for 
completion.
       // We force completion to prevent them from timing out.
       completeDelayedFetchOrProduceRequests(topicPartition)
     }
 
     // Third delete the logs and checkpoint.
     val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {
+      override def accept(tp: TopicPartition, e: Throwable): Unit = {
+        error(s"Error while stopping/deleting the remote log partition: $tp", 
e)
+        errorMap.put(tp, e)
+      }
+    }
+
     if (partitionsToDelete.nonEmpty) {
       // Delete the logs and checkpoint.
       logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, 
e))
     }
+    remoteLogManager.foreach(rlm => {
+      // exclude the partitions with offline/error state
+      errorMap.keySet.foreach(remotePartitionsToDelete.remove)

Review Comment:
   We are deleting the local logs first. If any error encountered while 
deleting the local log segments, we are removing that partition from 
`remotePartitionsToDelete` set. The assumption made here is if there is an 
error, then we will retry the operation again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to