[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1294012031 ## core/src/test/scala/integration/kafka/admin/RemoteTopicCRUDTest.scala: ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Logging, TestInfoUtils, TestUtils} +import org.apache.kafka.common.{TopicIdPartition, Uuid} +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.utils.MockTime +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util +import java.util.{Collections, Optional, Properties} +import java.util.concurrent.ExecutionException +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.Seq +import scala.util.Random + +@Tag("integration") +class RemoteTopicCRUDTest extends IntegrationTestHarness with Logging { + + private var testTopicName: String = _ + + override protected def brokerCount: Int = 2 + + override protected def modifyConfigs(props: Seq[Properties]): Unit = { +props.foreach(p => p.putAll(overrideProps())) + } + + override protected def kraftControllerConfigs(): Seq[Properties] = { +Seq(overrideProps()) + } + + @BeforeEach + override def setUp(info: TestInfo): Unit = { +super.setUp(info) +testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}" + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTopicDeletion(quorum: String): Unit = { Review Comment: Should we force some data into the remote storage and make sure that both the remote and local data are deleted after topic deletion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1291773207 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2418,7 +2449,15 @@ class ReplicaManager(val config: KafkaConfig, // Handle deleted partitions. We need to do this first because we might subsequently // create new partitions with the same names as the ones we are deleting here. if (!localChanges.deletes.isEmpty) { -val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap +val deletes = localChanges.deletes.asScala + .map(tp => { +def isCurrentLeader = Option(delta.image().getTopic(tp.topic())) Review Comment: Could `isCurrentLeader` just be a val? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2418,7 +2449,15 @@ class ReplicaManager(val config: KafkaConfig, // Handle deleted partitions. We need to do this first because we might subsequently // create new partitions with the same names as the ones we are deleting here. if (!localChanges.deletes.isEmpty) { -val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap +val deletes = localChanges.deletes.asScala + .map(tp => { Review Comment: It's simpler to do `.map{ tp => ...}` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1289212842 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -526,25 +530,37 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStopA map from a topic partition to a boolean indicating - *whether the partition should be deleted. + * @param partitionsToStop A map from a topic partition to a boolean indicating + * whether the partition should be deleted. + * @return A map from partitions to exceptions which occurred. + * If no errors occurred, the map will be empty. + */ + protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = { Review Comment: All usage of this method is within this class. Could this be private? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig, } partitionsToDelete += topicPartition } + if (stopPartition.deleteRemoteLog) +remotePartitionsToDelete += topicPartition + // If we were the leader, we may have some operations still waiting for completion. // We force completion to prevent them from timing out. completeDelayedFetchOrProduceRequests(topicPartition) } // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() +val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] { + override def accept(tp: TopicPartition, e: Throwable): Unit = { +error(s"Error while stopping/deleting the remote log partition: $tp", e) +errorMap.put(tp, e) + } +} + if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint. logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e)) } +remoteLogManager.foreach(rlm => { Review Comment: It's simpler to do `remoteLogManager.foreach{rlm => ...}` instead. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig, } partitionsToDelete += topicPartition } + if (stopPartition.deleteRemoteLog) +remotePartitionsToDelete += topicPartition + // If we were the leader, we may have some operations still waiting for completion. // We force completion to prevent them from timing out. completeDelayedFetchOrProduceRequests(topicPartition) } // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() +val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] { Review Comment: Do we need this handler since it only does logging? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -526,25 +530,37 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStopA map from a topic partition to a boolean indicating - *whether the partition should be deleted. + * @param partitionsToStop A map from a topic partition to a boolean indicating + * whether the partition should be deleted. + * @return A map from partitions to exceptions which occurred. + * If no errors occurred, the map will be empty. + */ + protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = { +stopPartitions(partitionsToStop.map(e => StopPartition(e._1, e._2)).toSet) Review Comment: Could we use map `{ case(tp, deleteLocal) ... }` to avoid unnamed references of `._1 `and `._2`? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -558,17 +574,38 @@ class ReplicaManager(val config: KafkaConfig, } partitionsToDelete += topicPartition } + if (stopPartition.deleteRemoteLog) +remotePartitionsToDelete += topicPartition + // If we were the leader, we may have some operations still waiting for completion. // We force completion to prevent them from timing out. completeDelayedFetchOrProduceRequests(topicPartition) } // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() +val remoteStorageErrorHandler = new BiConsumer[TopicPartition, Throwable] { + override def accept(tp: TopicPartition, e: Throwable): Unit = { +error(s"Error while stopping/deleting the remote log partition: $tp", e) +errorMap.put(tp, e) + } +} + if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint.
[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1270993960 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -119,6 +119,8 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; +private static final Set deletedTopicIds = ConcurrentHashMap.newKeySet(); Review Comment: It's weird to have this field as a static member. Could we make it an instance member? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } +public void cleanupDeletedRemoteLogSegments() { +if (isCancelled()) +return; + +Uuid topicId = topicIdPartition.topicId(); +if (deletedTopicIds.contains(topicId)) { +cleanupAllRemoteLogSegments(); +cancelRLMtask(); +deletedTopicIds.remove(topicId); +} +} + +private void cleanupAllRemoteLogSegments() { Review Comment: We could, but it adds its own overhead. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } +public void cleanupDeletedRemoteLogSegments() { +if (isCancelled()) +return; + +Uuid topicId = topicIdPartition.topicId(); +if (deletedTopicIds.contains(topicId)) { +cleanupAllRemoteLogSegments(); +cancelRLMtask(); +deletedTopicIds.remove(topicId); +} +} + +private void cleanupAllRemoteLogSegments() { +if (!isLeader()) Review Comment: This would still be a bit weird. It's possible for a partition to be already leaderless (for example, no replica in ISR) when a topic is deleted. How is that case handled? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStopA map from a topic partition to a boolean indicating - *whether the partition should be deleted. + * @param partitionsToStopA map from a topic partition to a boolean indicating + *whether the partition should be deleted. + * @param partitionsMaybeToDeleteRemote A set of topic partitions that may need to delete + *remote segments. * - * @returnA map from partitions to exceptions which occurred. - *If no errors occurred, the map will be empty. + * @returnA map from partitions to exceptions which occurred. + *If no errors occurred, the map will be empty. */ protected def stopPartitions( -partitionsToStop: Map[TopicPartition, Boolean] +partitionsToStop: Map[TopicPartition, Boolean], Review Comment: Could you file a jira to track that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1263928100 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } +public void cleanupDeletedRemoteLogSegments() { +if (isCancelled()) +return; + +Uuid topicId = topicIdPartition.topicId(); +if (deletedTopicIds.contains(topicId)) { +cleanupAllRemoteLogSegments(); +cancelRLMtask(); +deletedTopicIds.remove(topicId); +} +} + +private void cleanupAllRemoteLogSegments() { +if (!isLeader()) Review Comment: Hmm, after a topic is deleted, there is no leader. ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File], checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) } addLogToBeDeleted(removedLog) +if (deleteRemote && removedLog.remoteLogEnabled()) Review Comment: It's still weird to reference remoteLogEnabled in LogManager since it only manages local data. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStopA map from a topic partition to a boolean indicating - *whether the partition should be deleted. + * @param partitionsToStopA map from a topic partition to a boolean indicating + *whether the partition should be deleted. + * @param partitionsMaybeToDeleteRemote A set of topic partitions that may need to delete + *remote segments. * - * @returnA map from partitions to exceptions which occurred. - *If no errors occurred, the map will be empty. + * @returnA map from partitions to exceptions which occurred. + *If no errors occurred, the map will be empty. */ protected def stopPartitions( -partitionsToStop: Map[TopicPartition, Boolean] +partitionsToStop: Map[TopicPartition, Boolean], Review Comment: Will the KRaft support be added in 3.6.0? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } +public void cleanupDeletedRemoteLogSegments() { +if (isCancelled()) +return; + +Uuid topicId = topicIdPartition.topicId(); +if (deletedTopicIds.contains(topicId)) { +cleanupAllRemoteLogSegments(); +cancelRLMtask(); +deletedTopicIds.remove(topicId); +} +} + +private void cleanupAllRemoteLogSegments() { Review Comment: This is the case that I am thinking about. Topic is being deleted and the controller marks the topic as deleted. The remote segment deletion is in progress but not completed. There is power outage and the whole cluster is down. After the cluster is restarted, since topic deletion won't be triggered again, the remaining remote segments for the deleted topic won't be cleaned. The local storage seems to have a similar issue right now since it's also deleted asynchronously. I am just wondering how we plan to address this new issue. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig, // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. stoppedPartitions += topicPartition -> deletePartition + if (remoteLogManager.isDefined) +partitionsMaybeToDeleteRemote += topicPartition Review Comment: Hmm, `allPartitions` only stores the partitions with a replica in this broker. So, `HostedPartition.None` only means that the partition doesn't reside in this broker, but the partition could still exist in other brokers in the cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic
junrao commented on code in PR #13947: URL: https://github.com/apache/kafka/pull/13947#discussion_r1261801130 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig, // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. stoppedPartitions += topicPartition -> deletePartition + if (remoteLogManager.isDefined) +partitionsMaybeToDeleteRemote += topicPartition Review Comment: Hmm, it seems this case can occur during partition reassignment. In that case, we don't want to delete the remote data, right? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } +public void cleanupDeletedRemoteLogSegments() { Review Comment: This process runs every replica. So, we will be deleting the same remote segment multiple times? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig, /** * Stop the given partitions. * - * @param partitionsToStopA map from a topic partition to a boolean indicating - *whether the partition should be deleted. + * @param partitionsToStopA map from a topic partition to a boolean indicating + *whether the partition should be deleted. + * @param partitionsMaybeToDeleteRemote A set of topic partitions that may need to delete + *remote segments. * - * @returnA map from partitions to exceptions which occurred. - *If no errors occurred, the map will be empty. + * @returnA map from partitions to exceptions which occurred. + *If no errors occurred, the map will be empty. */ protected def stopPartitions( -partitionsToStop: Map[TopicPartition, Boolean] +partitionsToStop: Map[TopicPartition, Boolean], Review Comment: It seems that the implementation doesn't support KRaft controller. Do we plan to support that for the 3.6.0 release? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File], checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) } addLogToBeDeleted(removedLog) +if (deleteRemote && removedLog.remoteLogEnabled()) + RemoteLogManager.addTopicIdToBeDeleted(removedLog.topicIdAsJava) Review Comment: LogManager only manages local data. So, it's a bit weird to have it call RemoteLogManager. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException } } +public void cleanupDeletedRemoteLogSegments() { +if (isCancelled()) +return; + +Uuid topicId = topicIdPartition.topicId(); +if (deletedTopicIds.contains(topicId)) { +cleanupAllRemoteLogSegments(); +cancelRLMtask(); +deletedTopicIds.remove(topicId); +} +} + +private void cleanupAllRemoteLogSegments() { Review Comment: Since this runs asynchronously after topic deletion completes, if every replica is restarted before all remote segments are deleted, we will never be able to remove the remaining remote segments for the deleted topics? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org