This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c82ecd67f2 KAFKA-16807 DescribeLogDirsResponseData#results#topics 
have unexpected topics having empty partitions (#16042)
2c82ecd67f2 is described below

commit 2c82ecd67f2f6b412f625e8efc1457e7fb7f74dd
Author: Ken Huang <100591800+m1a...@users.noreply.github.com>
AuthorDate: Sun Jun 2 18:33:02 2024 +0900

    KAFKA-16807 DescribeLogDirsResponseData#results#topics have unexpected 
topics having empty partitions (#16042)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../main/scala/kafka/server/ReplicaManager.scala   | 25 ++++++++--------
 .../unit/kafka/server/ReplicaManagerTest.scala     | 33 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index aa56269a2f4..a2a070bcd03 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -33,6 +33,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
+import 
org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import 
org.apache.kafka.common.message.LeaderAndIsrResponseData.{LeaderAndIsrPartitionError,
 LeaderAndIsrTopicError}
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
@@ -67,7 +68,7 @@ import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.Lock
 import java.util.concurrent.{CompletableFuture, Future, 
RejectedExecutionException, TimeUnit}
-import java.util.{Optional, OptionalInt, OptionalLong}
+import java.util.{Collections, Optional, OptionalInt, OptionalLong}
 import scala.collection.{Map, Seq, Set, mutable}
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
@@ -1249,9 +1250,9 @@ class ReplicaManager(val config: KafkaConfig,
         val fileStore = Files.getFileStore(file)
         val totalBytes = adjustForLargeFileSystems(fileStore.getTotalSpace)
         val usableBytes = adjustForLargeFileSystems(fileStore.getUsableSpace)
-        logsByDir.get(absolutePath) match {
+        val topicInfos = logsByDir.get(absolutePath) match {
           case Some(logs) =>
-            val topicInfos = logs.groupBy(_.topicPartition.topic).map{case 
(topic, logs) =>
+            logs.groupBy(_.topicPartition.topic).map { case (topic, logs) =>
               new 
DescribeLogDirsResponseData.DescribeLogDirsTopic().setName(topic).setPartitions(
                 logs.filter { log =>
                   partitions.contains(log.topicPartition)
@@ -1262,17 +1263,19 @@ class ReplicaManager(val config: KafkaConfig,
                     .setOffsetLag(getLogEndOffsetLag(log.topicPartition, 
log.logEndOffset, log.isFuture))
                     .setIsFutureKey(log.isFuture)
                 }.toList.asJava)
-            }.toList.asJava
-
-            new 
DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
-              .setErrorCode(Errors.NONE.code).setTopics(topicInfos)
-              .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
+            }.filterNot(_.partitions().isEmpty).toList.asJava
           case None =>
-            new 
DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
-              .setErrorCode(Errors.NONE.code)
-              .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
+            Collections.emptyList[DescribeLogDirsTopic]()
         }
 
+        val describeLogDirsResult = new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+          .setLogDir(absolutePath).setTopics(topicInfos)
+          .setErrorCode(Errors.NONE.code)
+          .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
+        if (!topicInfos.isEmpty)
+          describeLogDirsResult.setTopics(topicInfos)
+        describeLogDirsResult
+
       } catch {
         case e: KafkaStorageException =>
           warn("Unable to describe replica dirs for %s".format(absolutePath), 
e)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6b655ea7837..97ba10d8bef 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -6450,6 +6450,39 @@ class ReplicaManagerTest {
         assertEquals(Errors.NONE.code, response.errorCode)
         assertTrue(response.totalBytes > 0)
         assertTrue(response.usableBytes >= 0)
+        assertFalse(response.topics().isEmpty)
+        response.topics().forEach(t => assertFalse(t.partitions().isEmpty))
+      }
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testDescribeLogDirsWithoutAnyPartitionTopic(): Unit = {
+    val noneTopic = "none-topic"
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+    val offsetFromLeader = 5
+
+    // Prepare the mocked components for the test
+    val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, 
leaderBrokerId, countDownLatch,
+      expectTruncation = false, localLogOffset = Some(10), offsetFromLeader = 
offsetFromLeader, topicId = Some(topicId))
+
+    try {
+      val responses = replicaManager.describeLogDirs(Set(new 
TopicPartition(noneTopic, topicPartition)))
+      assertEquals(mockLogMgr.liveLogDirs.size, responses.size)
+      responses.foreach { response =>
+        assertEquals(Errors.NONE.code, response.errorCode)
+        assertTrue(response.totalBytes > 0)
+        assertTrue(response.usableBytes >= 0)
+        assertTrue(response.topics().isEmpty)
       }
     } finally {
       replicaManager.shutdown(checkpointHW = false)

Reply via email to