junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1263928100
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } + public void cleanupDeletedRemoteLogSegments() { + if (isCancelled()) + return; + + Uuid topicId = topicIdPartition.topicId(); + if (deletedTopicIds.contains(topicId)) { + cleanupAllRemoteLogSegments(); + cancelRLMtask(); + deletedTopicIds.remove(topicId); + } + } + + private void cleanupAllRemoteLogSegments() { + if (!isLeader()) Review Comment: Hmm, after a topic is deleted, there is no leader. ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File], checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) } addLogToBeDeleted(removedLog) + if (deleteRemote && removedLog.remoteLogEnabled()) Review Comment: It's still weird to reference remoteLogEnabled in LogManager since it only manages local data. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStop A map from a topic partition to a boolean indicating - * whether the partition should be deleted. + * @param partitionsToStop A map from a topic partition to a boolean indicating + * whether the partition should be deleted. + * @param partitionsMaybeToDeleteRemote A set of topic partitions that may need to delete + * remote segments. * - * @return A map from partitions to exceptions which occurred. - * If no errors occurred, the map will be empty. + * @return A map from partitions to exceptions which occurred. + * If no errors occurred, the map will be empty. */ protected def stopPartitions( - partitionsToStop: Map[TopicPartition, Boolean] + partitionsToStop: Map[TopicPartition, Boolean], Review Comment: Will the KRaft support be added in 3.6.0? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } + public void cleanupDeletedRemoteLogSegments() { + if (isCancelled()) + return; + + Uuid topicId = topicIdPartition.topicId(); + if (deletedTopicIds.contains(topicId)) { + cleanupAllRemoteLogSegments(); + cancelRLMtask(); + deletedTopicIds.remove(topicId); + } + } + + private void cleanupAllRemoteLogSegments() { Review Comment: This is the case that I am thinking about. Topic is being deleted and the controller marks the topic as deleted. The remote segment deletion is in progress but not completed. There is power outage and the whole cluster is down. After the cluster is restarted, since topic deletion won't be triggered again, the remaining remote segments for the deleted topic won't be cleaned. The local storage seems to have a similar issue right now since it's also deleted asynchronously. I am just wondering how we plan to address this new issue. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig, // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. stoppedPartitions += topicPartition -> deletePartition + if (remoteLogManager.isDefined) + partitionsMaybeToDeleteRemote += topicPartition Review Comment: Hmm, `allPartitions` only stores the partitions with a replica in this broker. So, `HostedPartition.None` only means that the partition doesn't reside in this broker, but the partition could still exist in other brokers in the cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org