junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1263928100


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
             }
         }
 
+        public void cleanupDeletedRemoteLogSegments() {
+            if (isCancelled())
+                return;
+
+            Uuid topicId = topicIdPartition.topicId();
+            if (deletedTopicIds.contains(topicId)) {
+                cleanupAllRemoteLogSegments();
+                cancelRLMtask();
+                deletedTopicIds.remove(topicId);
+            }
+        }
+
+        private void cleanupAllRemoteLogSegments() {
+            if (!isLeader())

Review Comment:
   Hmm, after a topic is deleted, there is no leader.



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File],
           checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
         }
         addLogToBeDeleted(removedLog)
+        if (deleteRemote && removedLog.remoteLogEnabled())

Review Comment:
   It's still weird to reference remoteLogEnabled in LogManager since it only 
manages local data.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Stop the given partitions.
    *
-   * @param partitionsToStop    A map from a topic partition to a boolean 
indicating
-   *                            whether the partition should be deleted.
+   * @param partitionsToStop                A map from a topic partition to a 
boolean indicating
+   *                                        whether the partition should be 
deleted.
+   * @param partitionsMaybeToDeleteRemote   A set of topic partitions that may 
need to delete
+   *                                        remote segments.
    *
-   * @return                    A map from partitions to exceptions which 
occurred.
-   *                            If no errors occurred, the map will be empty.
+   * @return                                A map from partitions to 
exceptions which occurred.
+   *                                        If no errors occurred, the map 
will be empty.
    */
   protected def stopPartitions(
-    partitionsToStop: Map[TopicPartition, Boolean]
+    partitionsToStop: Map[TopicPartition, Boolean],

Review Comment:
   Will the KRaft support be added in 3.6.0?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
             }
         }
 
+        public void cleanupDeletedRemoteLogSegments() {
+            if (isCancelled())
+                return;
+
+            Uuid topicId = topicIdPartition.topicId();
+            if (deletedTopicIds.contains(topicId)) {
+                cleanupAllRemoteLogSegments();
+                cancelRLMtask();
+                deletedTopicIds.remove(topicId);
+            }
+        }
+
+        private void cleanupAllRemoteLogSegments() {

Review Comment:
   This is the case that I am thinking about. Topic is being deleted and the 
controller marks the topic as deleted. The remote segment deletion is in 
progress but not completed. There is power outage and the whole cluster is 
down. After the cluster is restarted, since topic deletion won't be triggered 
again, the remaining remote segments for the deleted topic won't be cleaned. 
The local storage seems to have a similar issue right now since it's also 
deleted asynchronously. I am just wondering how we plan to address this new 
issue.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig,
               // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is 
down and recovers.
               stoppedPartitions += topicPartition -> deletePartition
+              if (remoteLogManager.isDefined)
+                partitionsMaybeToDeleteRemote += topicPartition

Review Comment:
   Hmm, `allPartitions` only stores the partitions with a replica in this 
broker. So, `HostedPartition.None` only means that the partition doesn't reside 
in this broker, but the partition could still exist in other brokers in the 
cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to