junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1532500641


##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -254,33 +262,41 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-    val context = new RegistrationTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    val ctx = new RegistrationTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
     val controllerNode = new Node(3000, "localhost", 8021)
-    context.controllerNodeProvider.node.set(controllerNode)
-    manager.start(() => context.highestMetadataOffset.get(),
-      context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.of(10L))
-    TestUtils.retry(60000) {
-      assertEquals(1, context.mockChannelManager.unsentQueue.size)
-      assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-    }
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-    TestUtils.retry(10000) {
-      context.poll()
-      assertEquals(1000L, manager.brokerEpoch)
-    }
 
+    def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+    def nextHeartbeatRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    def nextRegistrationRequest(epoch: Long) =
+      doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+    // Broker registers and response sets epoch to 1000L
+    assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+    nextHeartbeatRequest() // poll for next request as way to synchronize with 
the new value into brokerEpoch
+    assertEquals(1000L, manager.brokerEpoch)
+
+    // Trigger JBOD MV update
     manager.handleKraftJBODMetadataVersionUpdate()
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-    TestUtils.retry(60000) {
-      context.time.sleep(100)
-      context.poll()
-      manager.eventQueue.wakeup()
-      assertEquals(1200, manager.brokerEpoch)
-    }
+
+    // Depending on scheduling, the next request could either be 
BrokerRegistration or BrokerHeartbeat.

Review Comment:
   Before calling `manager.handleKraftJBODMetadataVersionUpdate()`, there 
should be 1 `CommunicationEvent` with a delay of 100 in `eventQueue`. Until we 
call `poll`, this `CommunicationEvent` will remain in `eventQueue`. Calling 
`manager.handleKraftJBODMetadataVersionUpdate()` causes a 
`KraftJBODMetadataVersionUpdateEvent` with no delay to be added to 
`eventQueue`. The `KraftJBODMetadataVersionUpdateEvent` will then be processed 
and replace the `CommunicationEvent` with a delay of 0. The 
`CommunicationEvent` will then be processed, which causes a 
`BrokerRegistration` request to be sent. So, it seems that the next request 
should always be `BrokerRegistration` when we call `poll`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to