junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1289212842


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -526,25 +530,37 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Stop the given partitions.
    *
-   * @param partitionsToStop    A map from a topic partition to a boolean 
indicating
-   *                            whether the partition should be deleted.
+   * @param partitionsToStop A map from a topic partition to a boolean 
indicating
+   *                         whether the partition should be deleted.
+   * @return A map from partitions to exceptions which occurred.
+   *         If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, 
Boolean]): Map[TopicPartition, Throwable] = {

Review Comment:
   All usage of this method is within this class. Could this be private?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
         }
         partitionsToDelete += topicPartition
       }
+      if (stopPartition.deleteRemoteLog)
+        remotePartitionsToDelete += topicPartition
+
       // If we were the leader, we may have some operations still waiting for 
completion.
       // We force completion to prevent them from timing out.
       completeDelayedFetchOrProduceRequests(topicPartition)
     }
 
     // Third delete the logs and checkpoint.
     val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {
+      override def accept(tp: TopicPartition, e: Throwable): Unit = {
+        error(s"Error while stopping/deleting the remote log partition: $tp", 
e)
+        errorMap.put(tp, e)
+      }
+    }
+
     if (partitionsToDelete.nonEmpty) {
       // Delete the logs and checkpoint.
       logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, 
e))
     }
+    remoteLogManager.foreach(rlm => {

Review Comment:
   It's simpler to do `remoteLogManager.foreach{rlm => ...}` instead.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
         }
         partitionsToDelete += topicPartition
       }
+      if (stopPartition.deleteRemoteLog)
+        remotePartitionsToDelete += topicPartition
+
       // If we were the leader, we may have some operations still waiting for 
completion.
       // We force completion to prevent them from timing out.
       completeDelayedFetchOrProduceRequests(topicPartition)
     }
 
     // Third delete the logs and checkpoint.
     val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {

Review Comment:
   Do we need this handler since it only does logging?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -526,25 +530,37 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Stop the given partitions.
    *
-   * @param partitionsToStop    A map from a topic partition to a boolean 
indicating
-   *                            whether the partition should be deleted.
+   * @param partitionsToStop A map from a topic partition to a boolean 
indicating
+   *                         whether the partition should be deleted.
+   * @return A map from partitions to exceptions which occurred.
+   *         If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, 
Boolean]): Map[TopicPartition, Throwable] = {
+    stopPartitions(partitionsToStop.map(e => StopPartition(e._1, e._2)).toSet)

Review Comment:
   Could we use map `{ case(tp, deleteLocal) ... }` to avoid unnamed references 
of `._1 `and `._2`?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
         }
         partitionsToDelete += topicPartition
       }
+      if (stopPartition.deleteRemoteLog)
+        remotePartitionsToDelete += topicPartition
+
       // If we were the leader, we may have some operations still waiting for 
completion.
       // We force completion to prevent them from timing out.
       completeDelayedFetchOrProduceRequests(topicPartition)
     }
 
     // Third delete the logs and checkpoint.
     val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {
+      override def accept(tp: TopicPartition, e: Throwable): Unit = {
+        error(s"Error while stopping/deleting the remote log partition: $tp", 
e)
+        errorMap.put(tp, e)
+      }
+    }
+
     if (partitionsToDelete.nonEmpty) {
       // Delete the logs and checkpoint.
       logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, 
e))
     }
+    remoteLogManager.foreach(rlm => {
+      // exclude the partitions with offline/error state
+      errorMap.keySet.foreach(remotePartitionsToDelete.remove)

Review Comment:
   Is this necessary since errorMap starts as empty?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set<Partition> 
partitionsBecomeLeader,
     /**
      * Deletes the internal topic partition info if delete flag is set as true.
      *
-     * @param topicPartition topic partition to be stopped.
+     * @param topicPartitions topic partitions that needs to be stopped.

Review Comment:
   Could we add the missing javadoc for errorHandler?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -478,7 +481,8 @@ class ReplicaManager(val config: KafkaConfig,
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
                   requestLeaderEpoch >= currentLeaderEpoch) {
-                stoppedPartitions += topicPartition -> deletePartition
+                stoppedPartitions += StopPartition(topicPartition, 
deletePartition,
+                  deletePartition && partition.isLeader && requestLeaderEpoch 
== LeaderAndIsr.EpochDuringDelete)

Review Comment:
   This may be fine for now. However, the edge case is that it's possible for a 
partition to be already leaderless (for example, no replica in ISR) when a 
topic is deleted. Could we file a jira to track this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to