junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1530918656


##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-    val context = new RegistrationTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    val ctx = new RegistrationTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
     val controllerNode = new Node(3000, "localhost", 8021)
-    context.controllerNodeProvider.node.set(controllerNode)
-    manager.start(() => context.highestMetadataOffset.get(),
-      context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.of(10L))
-    TestUtils.retry(60000) {
-      assertEquals(1, context.mockChannelManager.unsentQueue.size)
-      assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-    }
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-    TestUtils.retry(10000) {
-      context.poll()
-      assertEquals(1000L, manager.brokerEpoch)
-    }
 
+    def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+    def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    def nextRegistrationRequest(epoch: Long) =
+      doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+    // Broker registers and response sets epoch to 1000L
+    assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+    nextRequest() // poll for next request as way to synchronize with the new 
value into brokerEpoch
+    assertEquals(1000L, manager.brokerEpoch)
+
+    // Trigger JBOD MV update
     manager.handleKraftJBODMetadataVersionUpdate()
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-    TestUtils.retry(60000) {
-      context.time.sleep(100)
-      context.poll()
-      manager.eventQueue.wakeup()
-      assertEquals(1200, manager.brokerEpoch)
-    }
+
+    // We may have to accept some heartbeats before the new registration is 
sent
+    while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()

Review Comment:
   `prepareResponse` knows the request type. Could we generate the response 
corresponding to the request type there?



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest {
     result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-    while (!future.isDone || context.mockClient.hasInFlightRequests) {
-      context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+    while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+      // If the manager is idling until scheduled events we need to advance 
the clock
+      if (manager.eventQueue.scheduledAfterIdling()
+        .filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid 
triggering timeout events

Review Comment:
   I see. Perhaps we could add a step after `poll(ctx, manager, registration)` 
to explicitly drain `RegistrationTimeoutEvent` first? This makes the test safer 
since it won't be sensitive to the timeout config values.



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest {
       Collections.emptyMap(), OptionalLong.empty())
     poll(ctx, manager, registration)
 
+    def nextHeartbeatDirs(): Set[String] =
+      poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
+        .data().offlineLogDirs().asScala.map(_.toString).toSet
+    assertEquals(Set.empty, nextHeartbeatDirs())
     
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
     
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())

Review Comment:
   Thanks for the explanation. You are right. So, this is not an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to