soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1529919190


##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest {
       Collections.emptyMap(), OptionalLong.empty())
     poll(ctx, manager, registration)
 
+    def nextHeartbeatDirs(): Set[String] =
+      poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
+        .data().offlineLogDirs().asScala.map(_.toString).toSet
+    assertEquals(Set.empty, nextHeartbeatDirs())
     
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
     
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())

Review Comment:
   Thanks for carefully thinking this through. 
   
   I believe there's a mistake in that sequence of events though:
   
   > nextHeartbeatDirs() causes the time to advance quickly and passes 100
   
   This isn't how `BrokerLifecycleManagerTest#poll()` works. It will only 
advance time if there is no event in the queue that can immediately run, so o1 
will always run before c2.
   
   > o1 is processed at manager.eventQueue and a CommunicationEvent c3 is 
appended to manager.eventQueue
   
   Because o1 runs before c2, the EventQueue will override c2 with c3 as 
they're both deferred events with the same tag.
   
   After running this test tens of thousands of iterations, I was unable to 
reproduce a flaky run. 
   
   



##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-    val context = new RegistrationTestContext(configProperties)
-    val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    val ctx = new RegistrationTestContext(configProperties)
+    val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
     val controllerNode = new Node(3000, "localhost", 8021)
-    context.controllerNodeProvider.node.set(controllerNode)
-    manager.start(() => context.highestMetadataOffset.get(),
-      context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+    ctx.controllerNodeProvider.node.set(controllerNode)
+
+    manager.start(() => ctx.highestMetadataOffset.get(),
+      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.of(10L))
-    TestUtils.retry(60000) {
-      assertEquals(1, context.mockChannelManager.unsentQueue.size)
-      assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-    }
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-    TestUtils.retry(10000) {
-      context.poll()
-      assertEquals(1000L, manager.brokerEpoch)
-    }
 
+    def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+    def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    def nextRegistrationRequest(epoch: Long) =
+      doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+    // Broker registers and response sets epoch to 1000L
+    assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+    nextRequest() // poll for next request as way to synchronize with the new 
value into brokerEpoch
+    assertEquals(1000L, manager.brokerEpoch)
+
+    // Trigger JBOD MV update
     manager.handleKraftJBODMetadataVersionUpdate()
-    context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-      new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-    TestUtils.retry(60000) {
-      context.time.sleep(100)
-      context.poll()
-      manager.eventQueue.wakeup()
-      assertEquals(1200, manager.brokerEpoch)
-    }
+
+    // We may have to accept some heartbeats before the new registration is 
sent
+    while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()

Review Comment:
   I found it difficult to predict when the new `BrokerRegistration` request is 
sent. Depending on scheduling it could be sent straight away or it could get 
queued behind another heartbeat.
   
   It is indeed incorrect to mock `BrokerRegistrationResponse` for a 
`BrokerRegistration` request. It is however harmless because 
`BrokerRegistrationResponseEvent` will 
`scheduleNextCommunicationAfterFailure()` if 
`!response.responseBody().isInstanceOf[BrokerRegistrationResponse]`.
   
   And so I think this is the simplest and most straightforward way to deal 
with the potential extra heartbeat request - by consuming the requests until 
they're no longer `BrokerHeartbeatRequest`, even if that implies failing the 
first attempt at sending `BrokerRegistrationRequest`.
   
   I've added comment to explain this. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to