This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b34d75c3136 MINOR: Fix flaky BrokerLifecycleManagerTest (#14836)
b34d75c3136 is described below

commit b34d75c31369f38b17d119b136417ed3494f6c7c
Author: Igor Soarez <[email protected]>
AuthorDate: Mon Nov 27 22:09:45 2023 +0000

    MINOR: Fix flaky BrokerLifecycleManagerTest (#14836)
    
    Fix some flakiness introduced by "MINOR: Always send cumulative failed dirs 
in HB request"
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../kafka/server/BrokerLifecycleManagerTest.scala  | 22 +++++++---------------
 .../MockNodeToControllerChannelManager.scala       |  2 +-
 2 files changed, 8 insertions(+), 16 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 1d5afa42502..113088af4d5 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
     while (!future.isDone || context.mockClient.hasInFlightRequests) {
       context.poll()
       manager.eventQueue.wakeup()
-      context.time.sleep(100)
+      context.time.sleep(5)
     }
     future.get
   }
@@ -214,28 +214,20 @@ class BrokerLifecycleManagerTest {
     ctx.controllerNodeProvider.node.set(controllerNode)
 
     val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-    val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, 
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
-
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.empty())
     poll(ctx, manager, registration)
 
     
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
-    poll(ctx, manager, heartbeats(0)).data()
-    val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
-
     
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
-    poll(ctx, manager, heartbeats(2)).data()
-    val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
-
     
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
-    poll(ctx, manager, heartbeats(4)).data()
-    val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()
-
-    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), 
dirs1.asScala.toSet)
-    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", 
"ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet)
-    assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", 
"1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet)
+    val latestHeartbeat = Seq.fill(10)(
+      prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    ).map(poll(ctx, manager, _)).last
+    assertEquals(
+      Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", 
"1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString),
+      latestHeartbeat.data().offlineLogDirs().asScala.toSet)
     manager.close()
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
 
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index a3b8643fb73..c3265d6be7f 100644
--- 
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
+++ 
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -32,7 +32,7 @@ class MockNodeToControllerChannelManager(
   val retryTimeoutMs: Int = 60000,
   val requestTimeoutMs: Int = 30000
 ) extends NodeToControllerChannelManager {
-  val unsentQueue = new java.util.ArrayDeque[NodeToControllerQueueItem]()
+  val unsentQueue = new 
java.util.concurrent.ConcurrentLinkedDeque[NodeToControllerQueueItem]()
 
   client.setNodeApiVersions(controllerApiVersions)
 

Reply via email to