This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2c82ecd67f2 KAFKA-16807 DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions (#16042) 2c82ecd67f2 is described below commit 2c82ecd67f2f6b412f625e8efc1457e7fb7f74dd Author: Ken Huang <100591800+m1a...@users.noreply.github.com> AuthorDate: Sun Jun 2 18:33:02 2024 +0900 KAFKA-16807 DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions (#16042) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../main/scala/kafka/server/ReplicaManager.scala | 25 ++++++++-------- .../unit/kafka/server/ReplicaManagerTest.scala | 33 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index aa56269a2f4..a2a070bcd03 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -33,6 +33,7 @@ import kafka.zk.KafkaZkClient import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult +import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrResponseData.{LeaderAndIsrPartitionError, LeaderAndIsrTopicError} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic @@ -67,7 +68,7 @@ import java.util import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.Lock import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit} -import java.util.{Optional, OptionalInt, OptionalLong} +import java.util.{Collections, Optional, OptionalInt, OptionalLong} import scala.collection.{Map, Seq, Set, mutable} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ @@ -1249,9 +1250,9 @@ class ReplicaManager(val config: KafkaConfig, val fileStore = Files.getFileStore(file) val totalBytes = adjustForLargeFileSystems(fileStore.getTotalSpace) val usableBytes = adjustForLargeFileSystems(fileStore.getUsableSpace) - logsByDir.get(absolutePath) match { + val topicInfos = logsByDir.get(absolutePath) match { case Some(logs) => - val topicInfos = logs.groupBy(_.topicPartition.topic).map{case (topic, logs) => + logs.groupBy(_.topicPartition.topic).map { case (topic, logs) => new DescribeLogDirsResponseData.DescribeLogDirsTopic().setName(topic).setPartitions( logs.filter { log => partitions.contains(log.topicPartition) @@ -1262,17 +1263,19 @@ class ReplicaManager(val config: KafkaConfig, .setOffsetLag(getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture)) .setIsFutureKey(log.isFuture) }.toList.asJava) - }.toList.asJava - - new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) - .setErrorCode(Errors.NONE.code).setTopics(topicInfos) - .setTotalBytes(totalBytes).setUsableBytes(usableBytes) + }.filterNot(_.partitions().isEmpty).toList.asJava case None => - new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) - .setErrorCode(Errors.NONE.code) - .setTotalBytes(totalBytes).setUsableBytes(usableBytes) + Collections.emptyList[DescribeLogDirsTopic]() } + val describeLogDirsResult = new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setLogDir(absolutePath).setTopics(topicInfos) + .setErrorCode(Errors.NONE.code) + .setTotalBytes(totalBytes).setUsableBytes(usableBytes) + if (!topicInfos.isEmpty) + describeLogDirsResult.setTopics(topicInfos) + describeLogDirsResult + } catch { case e: KafkaStorageException => warn("Unable to describe replica dirs for %s".format(absolutePath), e) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 6b655ea7837..97ba10d8bef 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6450,6 +6450,39 @@ class ReplicaManagerTest { assertEquals(Errors.NONE.code, response.errorCode) assertTrue(response.totalBytes > 0) assertTrue(response.usableBytes >= 0) + assertFalse(response.topics().isEmpty) + response.topics().forEach(t => assertFalse(t.partitions().isEmpty)) + } + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testDescribeLogDirsWithoutAnyPartitionTopic(): Unit = { + val noneTopic = "none-topic" + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + val offsetFromLeader = 5 + + // Prepare the mocked components for the test + val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, + expectTruncation = false, localLogOffset = Some(10), offsetFromLeader = offsetFromLeader, topicId = Some(topicId)) + + try { + val responses = replicaManager.describeLogDirs(Set(new TopicPartition(noneTopic, topicPartition))) + assertEquals(mockLogMgr.liveLogDirs.size, responses.size) + responses.foreach { response => + assertEquals(Errors.NONE.code, response.errorCode) + assertTrue(response.totalBytes > 0) + assertTrue(response.usableBytes >= 0) + assertTrue(response.topics().isEmpty) } } finally { replicaManager.shutdown(checkpointHW = false)