soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1529919190
########## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ########## @@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest { Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) + def nextHeartbeatDirs(): Set[String] = + poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))) + .data().offlineLogDirs().asScala.map(_.toString).toSet + assertEquals(Set.empty, nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) + assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) + assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), nextHeartbeatDirs()) Review Comment: Thanks for carefully thinking this through. I believe there's a mistake in that sequence of events though: > nextHeartbeatDirs() causes the time to advance quickly and passes 100 This isn't how `BrokerLifecycleManagerTest#poll()` works. It will only advance time if there is no event in the queue that can immediately run, so o1 will always run before c2. > o1 is processed at manager.eventQueue and a CommunicationEvent c3 is appended to manager.eventQueue Because o1 runs before c2, the EventQueue will override c2 with c3 as they're both deferred events with the same tag. After running this test tens of thousands of iterations, I was unable to reproduce a flaky run. ########## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ########## @@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { - val context = new RegistrationTestContext(configProperties) - val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) + val ctx = new RegistrationTestContext(configProperties) + val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) - context.controllerNodeProvider.node.set(controllerNode) - manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, + ctx.controllerNodeProvider.node.set(controllerNode) + + manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) - TestUtils.retry(60000) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) - } - context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) - TestUtils.retry(10000) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) - } + def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) + def nextRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) + def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + + // Broker registers and response sets epoch to 1000L + assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + + nextRequest() // poll for next request as way to synchronize with the new value into brokerEpoch + assertEquals(1000L, manager.brokerEpoch) + + // Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() - context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) - TestUtils.retry(60000) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) - } + + // We may have to accept some heartbeats before the new registration is sent + while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])() Review Comment: I found it difficult to predict when the new `BrokerRegistration` request is sent. Depending on scheduling it could be sent straight away or it could get queued behind another heartbeat. It is indeed incorrect to mock `BrokerRegistrationResponse` for a `BrokerRegistration` request. It is however harmless because `BrokerRegistrationResponseEvent` will `scheduleNextCommunicationAfterFailure()` if `!response.responseBody().isInstanceOf[BrokerRegistrationResponse]`. And so I think this is the simplest and most straightforward way to deal with the potential extra heartbeat request - by consuming the requests until they're no longer `BrokerHeartbeatRequest`, even if that implies failing the first attempt at sending `BrokerRegistrationRequest`. I've added comment to explain this. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org