apoorvmittal10 commented on code in PR #14770:
URL: https://github.com/apache/kafka/pull/14770#discussion_r1404379306


##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -207,34 +207,35 @@ class BrokerLifecycleManagerTest {
   }
 
   @Test
-  def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = {
+  def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
     val ctx = new RegistrationTestContext(configProperties)
     val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", isZkBroker = false)
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
 
     val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-    val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()
-      .setErrorCode(Errors.NOT_CONTROLLER.code())))
-    val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
-    val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, 
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
 
-    val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), 
Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
-    offlineDirs.foreach(manager.propagateDirectoryFailure)
-
-    // start the manager late to prevent a race, and force expectations on the 
first heartbeat
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.empty())
-
     poll(ctx, manager, registration)
-    val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs()
-    val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs()
-    val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs()
 
-    assertEquals(offlineDirs, dirs1.asScala.toSet)
-    assertEquals(offlineDirs, dirs2.asScala.toSet)
-    assertEquals(Set.empty, dirs3.asScala.toSet)
+    
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+    poll(ctx, manager, heartbeats(0)).data()
+    val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
+
+    
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+    poll(ctx, manager, heartbeats(2)).data()
+    val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
+
+    
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
+    poll(ctx, manager, heartbeats(4)).data()
+    val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()

Review Comment:
   Thanks for fixing this @soarez.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to