dajac commented on a change in pull request #8672: URL: https://github.com/apache/kafka/pull/8672#discussion_r446192217
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -433,29 +425,34 @@ class ReplicaManager(val config: KafkaConfig, case HostedPartition.None => // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. - stoppedPartitions += topicPartition -> partitionState + stoppedPartitions += topicPartition + if (deletePartition) + deletedPartitions += topicPartition + responseMap.put(topicPartition, Errors.NONE) } } // First stop fetchers for all partitions, then stop the corresponding replicas - val partitions = stoppedPartitions.keySet - replicaFetcherManager.removeFetcherForPartitions(partitions) - replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) + replicaFetcherManager.removeFetcherForPartitions(stoppedPartitions) + replicaAlterLogDirsManager.removeFetcherForPartitions(stoppedPartitions) - stoppedPartitions.foreach { case (topicPartition, partitionState) => - val deletePartition = partitionState.deletePartition - try { - stopReplica(topicPartition, deletePartition) - responseMap.put(topicPartition, Errors.NONE) - } catch { + // Delete the logs and checkpoint + logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => { + exception match { case e: KafkaStorageException => - stateChangeLogger.error(s"Ignoring StopReplica request (delete=$deletePartition) from " + + stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + - "partition is in an offline log directory", e) + "partition is in an offline log directory") responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) + + case e => + stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + + s"controller $controllerId with correlation id $correlationId " + + s"epoch $controllerEpoch for partition $topicPartition due to ${e.getMessage}") Review comment: That makes sense. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org