[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-14 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1294012031


##
core/src/test/scala/integration/kafka/admin/RemoteTopicCRUDTest.scala:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.utils.MockTime
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteLogSegmentState}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util
+import java.util.{Collections, Optional, Properties}
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.Seq
+import scala.util.Random
+
+@Tag("integration")
+class RemoteTopicCRUDTest extends IntegrationTestHarness with Logging {
+
+  private var testTopicName: String = _
+
+  override protected def brokerCount: Int = 2
+
+  override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+props.foreach(p => p.putAll(overrideProps()))
+  }
+
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+Seq(overrideProps())
+  }
+
+  @BeforeEach
+  override def setUp(info: TestInfo): Unit = {
+super.setUp(info)
+testTopicName = 
s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTopicDeletion(quorum: String): Unit = {

Review Comment:
   Should we force some data into the remote storage and make sure that both 
the remote and local data are deleted after topic deletion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-11 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1291773207


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2418,7 +2449,15 @@ class ReplicaManager(val config: KafkaConfig,
   // Handle deleted partitions. We need to do this first because we might 
subsequently
   // create new partitions with the same names as the ones we are deleting 
here.
   if (!localChanges.deletes.isEmpty) {
-val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap
+val deletes = localChanges.deletes.asScala
+  .map(tp => {
+def isCurrentLeader = Option(delta.image().getTopic(tp.topic()))

Review Comment:
   Could `isCurrentLeader` just be a val?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2418,7 +2449,15 @@ class ReplicaManager(val config: KafkaConfig,
   // Handle deleted partitions. We need to do this first because we might 
subsequently
   // create new partitions with the same names as the ones we are deleting 
here.
   if (!localChanges.deletes.isEmpty) {
-val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap
+val deletes = localChanges.deletes.asScala
+  .map(tp => {

Review Comment:
   It's simpler to do `.map{ tp => ...}` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-09 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1289212842


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,25 +530,37 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStop A map from a topic partition to a boolean 
indicating
+   * whether the partition should be deleted.
+   * @return A map from partitions to exceptions which occurred.
+   * If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, 
Boolean]): Map[TopicPartition, Throwable] = {

Review Comment:
   All usage of this method is within this class. Could this be private?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
 }
 partitionsToDelete += topicPartition
   }
+  if (stopPartition.deleteRemoteLog)
+remotePartitionsToDelete += topicPartition
+
   // If we were the leader, we may have some operations still waiting for 
completion.
   // We force completion to prevent them from timing out.
   completeDelayedFetchOrProduceRequests(topicPartition)
 }
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {
+  override def accept(tp: TopicPartition, e: Throwable): Unit = {
+error(s"Error while stopping/deleting the remote log partition: $tp", 
e)
+errorMap.put(tp, e)
+  }
+}
+
 if (partitionsToDelete.nonEmpty) {
   // Delete the logs and checkpoint.
   logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, 
e))
 }
+remoteLogManager.foreach(rlm => {

Review Comment:
   It's simpler to do `remoteLogManager.foreach{rlm => ...}` instead.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
 }
 partitionsToDelete += topicPartition
   }
+  if (stopPartition.deleteRemoteLog)
+remotePartitionsToDelete += topicPartition
+
   // If we were the leader, we may have some operations still waiting for 
completion.
   // We force completion to prevent them from timing out.
   completeDelayedFetchOrProduceRequests(topicPartition)
 }
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {

Review Comment:
   Do we need this handler since it only does logging?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,25 +530,37 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStop A map from a topic partition to a boolean 
indicating
+   * whether the partition should be deleted.
+   * @return A map from partitions to exceptions which occurred.
+   * If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, 
Boolean]): Map[TopicPartition, Throwable] = {
+stopPartitions(partitionsToStop.map(e => StopPartition(e._1, e._2)).toSet)

Review Comment:
   Could we use map `{ case(tp, deleteLocal) ... }` to avoid unnamed references 
of `._1 `and `._2`?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig,
 }
 partitionsToDelete += topicPartition
   }
+  if (stopPartition.deleteRemoteLog)
+remotePartitionsToDelete += topicPartition
+
   // If we were the leader, we may have some operations still waiting for 
completion.
   // We force completion to prevent them from timing out.
   completeDelayedFetchOrProduceRequests(topicPartition)
 }
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] {
+  override def accept(tp: TopicPartition, e: Throwable): Unit = {
+error(s"Error while stopping/deleting the remote log partition: $tp", 
e)
+errorMap.put(tp, e)
+  }
+}
+
 if (partitionsToDelete.nonEmpty) {
   // Delete the logs and checkpoint.
   

[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-07-21 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1270993960


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -119,6 +119,8 @@ public class RemoteLogManager implements Closeable {
 private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
 private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
 
+private static final Set deletedTopicIds = 
ConcurrentHashMap.newKeySet();

Review Comment:
   It's weird to have this field as a static member. Could we make it an 
instance member?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {
+if (isCancelled())
+return;
+
+Uuid topicId = topicIdPartition.topicId();
+if (deletedTopicIds.contains(topicId)) {
+cleanupAllRemoteLogSegments();
+cancelRLMtask();
+deletedTopicIds.remove(topicId);
+}
+}
+
+private void cleanupAllRemoteLogSegments() {

Review Comment:
   We could, but it adds its own overhead.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {
+if (isCancelled())
+return;
+
+Uuid topicId = topicIdPartition.topicId();
+if (deletedTopicIds.contains(topicId)) {
+cleanupAllRemoteLogSegments();
+cancelRLMtask();
+deletedTopicIds.remove(topicId);
+}
+}
+
+private void cleanupAllRemoteLogSegments() {
+if (!isLeader())

Review Comment:
   This would still be a bit weird. It's possible for a partition to be already 
leaderless (for example, no replica in ISR) when a topic is deleted. How is 
that case handled?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStopA map from a topic partition to a 
boolean indicating
+   *whether the partition should be 
deleted.
+   * @param partitionsMaybeToDeleteRemote   A set of topic partitions that may 
need to delete
+   *remote segments.
*
-   * @returnA map from partitions to exceptions which 
occurred.
-   *If no errors occurred, the map will be empty.
+   * @returnA map from partitions to 
exceptions which occurred.
+   *If no errors occurred, the map 
will be empty.
*/
   protected def stopPartitions(
-partitionsToStop: Map[TopicPartition, Boolean]
+partitionsToStop: Map[TopicPartition, Boolean],

Review Comment:
   Could you file a jira to track that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-07-14 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1263928100


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {
+if (isCancelled())
+return;
+
+Uuid topicId = topicIdPartition.topicId();
+if (deletedTopicIds.contains(topicId)) {
+cleanupAllRemoteLogSegments();
+cancelRLMtask();
+deletedTopicIds.remove(topicId);
+}
+}
+
+private void cleanupAllRemoteLogSegments() {
+if (!isLeader())

Review Comment:
   Hmm, after a topic is deleted, there is no leader.



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File],
   checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
 }
 addLogToBeDeleted(removedLog)
+if (deleteRemote && removedLog.remoteLogEnabled())

Review Comment:
   It's still weird to reference remoteLogEnabled in LogManager since it only 
manages local data.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStopA map from a topic partition to a 
boolean indicating
+   *whether the partition should be 
deleted.
+   * @param partitionsMaybeToDeleteRemote   A set of topic partitions that may 
need to delete
+   *remote segments.
*
-   * @returnA map from partitions to exceptions which 
occurred.
-   *If no errors occurred, the map will be empty.
+   * @returnA map from partitions to 
exceptions which occurred.
+   *If no errors occurred, the map 
will be empty.
*/
   protected def stopPartitions(
-partitionsToStop: Map[TopicPartition, Boolean]
+partitionsToStop: Map[TopicPartition, Boolean],

Review Comment:
   Will the KRaft support be added in 3.6.0?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {
+if (isCancelled())
+return;
+
+Uuid topicId = topicIdPartition.topicId();
+if (deletedTopicIds.contains(topicId)) {
+cleanupAllRemoteLogSegments();
+cancelRLMtask();
+deletedTopicIds.remove(topicId);
+}
+}
+
+private void cleanupAllRemoteLogSegments() {

Review Comment:
   This is the case that I am thinking about. Topic is being deleted and the 
controller marks the topic as deleted. The remote segment deletion is in 
progress but not completed. There is power outage and the whole cluster is 
down. After the cluster is restarted, since topic deletion won't be triggered 
again, the remaining remote segments for the deleted topic won't be cleaned. 
The local storage seems to have a similar issue right now since it's also 
deleted asynchronously. I am just wondering how we plan to address this new 
issue.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig,
   // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
   // This could happen when topic is being deleted while broker is 
down and recovers.
   stoppedPartitions += topicPartition -> deletePartition
+  if (remoteLogManager.isDefined)
+partitionsMaybeToDeleteRemote += topicPartition

Review Comment:
   Hmm, `allPartitions` only stores the partitions with a replica in this 
broker. So, `HostedPartition.None` only means that the partition doesn't reside 
in this broker, but the partition could still exist in other brokers in the 
cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-07-12 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1261801130


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig,
   // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
   // This could happen when topic is being deleted while broker is 
down and recovers.
   stoppedPartitions += topicPartition -> deletePartition
+  if (remoteLogManager.isDefined)
+partitionsMaybeToDeleteRemote += topicPartition

Review Comment:
   Hmm, it seems this case can occur during partition reassignment. In that 
case, we don't want to delete the remote data, right?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {

Review Comment:
   This process runs every replica. So, we will be deleting the same remote 
segment multiple times?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStopA map from a topic partition to a 
boolean indicating
+   *whether the partition should be 
deleted.
+   * @param partitionsMaybeToDeleteRemote   A set of topic partitions that may 
need to delete
+   *remote segments.
*
-   * @returnA map from partitions to exceptions which 
occurred.
-   *If no errors occurred, the map will be empty.
+   * @returnA map from partitions to 
exceptions which occurred.
+   *If no errors occurred, the map 
will be empty.
*/
   protected def stopPartitions(
-partitionsToStop: Map[TopicPartition, Boolean]
+partitionsToStop: Map[TopicPartition, Boolean],

Review Comment:
   It seems that the implementation doesn't support KRaft controller. Do we 
plan to support that for the 3.6.0 release?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File],
   checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
 }
 addLogToBeDeleted(removedLog)
+if (deleteRemote && removedLog.remoteLogEnabled())
+  RemoteLogManager.addTopicIdToBeDeleted(removedLog.topicIdAsJava)

Review Comment:
   LogManager only manages local data. So, it's a bit weird to have it call 
RemoteLogManager.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {
+if (isCancelled())
+return;
+
+Uuid topicId = topicIdPartition.topicId();
+if (deletedTopicIds.contains(topicId)) {
+cleanupAllRemoteLogSegments();
+cancelRLMtask();
+deletedTopicIds.remove(topicId);
+}
+}
+
+private void cleanupAllRemoteLogSegments() {

Review Comment:
   Since this runs asynchronously after topic deletion completes, if every 
replica is restarted before all remote segments are deleted, we will never be 
able to remove the remaining remote segments for the deleted topics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org