soarez commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1557442275


##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1437,7 +1438,9 @@ class KRaftClusterTest {
           assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
         }
 
-        // Modify foo-0 so that it refers to a future replica
+        // Modify foo-0 so that it refers to a future replica.
+        // This has the same effect as the main replica being in an offline
+        // log dir and the broker crashing just at the time of promotion

Review Comment:
   I think I know what you mean, but I don't know if this is easy to understand 
in its current phrasing. Here's a suggestion:
   
   This is equivalent to a failure during the promotion of the future replica 
and a restart with directory for the main replica being offline. 



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1456,6 +1459,76 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
+        setNumBrokerNodes(3).
+        setNumControllerNodes(1).build()).
+      build()
+    try {
+      cluster.format()
+      cluster.startup()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        val broker0 = cluster.brokers().get(0)
+        val broker1 = cluster.brokers().get(1)
+        val foo0 = new TopicPartition("foo", 0)
+
+        admin.createTopics(Arrays.asList(
+          new NewTopic("foo", 3, 3.toShort))).all().get()
+
+        // Wait until foo-0 is created on broker0.
+        TestUtils.retry(60000) {
+          assertTrue(broker0.logManager.getLog(foo0).isDefined)
+        }
+
+        // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
+        broker0.shutdown()
+        TestUtils.retry(60000) {
+          val info = broker1.metadataCache.getPartitionInfo("foo", 0)
+          assertTrue(info.isDefined)
+          assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
+        }
+
+        val log = broker0.logManager.getLog(foo0).get
+
+        // Copy foo-0 to another log dir
+        val parentDir = log.parentDir
+        var targetParentDir = parentDir.substring(0, parentDir.length - 1)
+        if (parentDir.endsWith("0")) {
+          targetParentDir += "1"
+        } else {
+          targetParentDir += "0"
+        }

Review Comment:
   This assumes your change to `BrokerNode.build()` and its specific naming 
pattern. We'd get a more robust test if instead you access the directory list 
on the broker, filter out `parentDir`, and choose one of the remaining log dirs.



##########
core/src/test/java/kafka/testkit/BrokerNode.java:
##########
@@ -81,8 +82,7 @@ public BrokerNode build(
                     logDataDirectories = Collections.
                         singletonList(String.format("combined_%d", id));
                 } else {
-                    logDataDirectories = Collections.
-                        singletonList(String.format("broker_%d_data0", id));
+                    logDataDirectories = 
Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", 
id), String.format("broker_%d_data1", id)));

Review Comment:
   I'm wondering if this is going to break test cases that don't support JBOD, 
if so there should be some new failing tests.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1456,6 +1459,76 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
+        setNumBrokerNodes(3).
+        setNumControllerNodes(1).build()).
+      build()
+    try {
+      cluster.format()
+      cluster.startup()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        val broker0 = cluster.brokers().get(0)
+        val broker1 = cluster.brokers().get(1)
+        val foo0 = new TopicPartition("foo", 0)
+
+        admin.createTopics(Arrays.asList(
+          new NewTopic("foo", 3, 3.toShort))).all().get()
+
+        // Wait until foo-0 is created on broker0.
+        TestUtils.retry(60000) {
+          assertTrue(broker0.logManager.getLog(foo0).isDefined)
+        }
+
+        // Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
+        broker0.shutdown()
+        TestUtils.retry(60000) {
+          val info = broker1.metadataCache.getPartitionInfo("foo", 0)
+          assertTrue(info.isDefined)
+          assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
+        }
+
+        val log = broker0.logManager.getLog(foo0).get
+
+        // Copy foo-0 to another log dir

Review Comment:
   It could be helpful for future eyes on this, to leave a comment explaining 
why we're manually copying the log dir in the test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to