dajac commented on a change in pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#discussion_r446192217



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -433,29 +425,34 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is 
down and recovers.
-              stoppedPartitions += topicPartition -> partitionState
+              stoppedPartitions += topicPartition
+              if (deletePartition)
+                deletedPartitions += topicPartition
+              responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
         // First stop fetchers for all partitions, then stop the corresponding 
replicas
-        val partitions = stoppedPartitions.keySet
-        replicaFetcherManager.removeFetcherForPartitions(partitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+        replicaFetcherManager.removeFetcherForPartitions(stoppedPartitions)
+        
replicaAlterLogDirsManager.removeFetcherForPartitions(stoppedPartitions)
 
-        stoppedPartitions.foreach { case (topicPartition, partitionState) =>
-          val deletePartition = partitionState.deletePartition
-          try {
-            stopReplica(topicPartition, deletePartition)
-            responseMap.put(topicPartition, Errors.NONE)
-          } catch {
+        // Delete the logs and checkpoint
+        logManager.asyncDelete(deletedPartitions, (topicPartition, exception) 
=> {
+          exception match {
             case e: KafkaStorageException =>
-              stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=$deletePartition) from " +
+              stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId 
" +
                 s"epoch $controllerEpoch for partition $topicPartition as the 
local replica for the " +
-                "partition is in an offline log directory", e)
+                "partition is in an offline log directory")
               responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+
+            case e =>
+              stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
+                s"controller $controllerId with correlation id $correlationId 
" +
+                s"epoch $controllerEpoch for partition $topicPartition due to 
${e.getMessage}")

Review comment:
       That makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to