This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b34d75c3136 MINOR: Fix flaky BrokerLifecycleManagerTest (#14836)
b34d75c3136 is described below
commit b34d75c31369f38b17d119b136417ed3494f6c7c
Author: Igor Soarez <[email protected]>
AuthorDate: Mon Nov 27 22:09:45 2023 +0000
MINOR: Fix flaky BrokerLifecycleManagerTest (#14836)
Fix some flakiness introduced by "MINOR: Always send cumulative failed dirs
in HB request"
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/server/BrokerLifecycleManagerTest.scala | 22 +++++++---------------
.../MockNodeToControllerChannelManager.scala | 2 +-
2 files changed, 8 insertions(+), 16 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 1d5afa42502..113088af4d5 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest {
while (!future.isDone || context.mockClient.hasInFlightRequests) {
context.poll()
manager.eventQueue.wakeup()
- context.time.sleep(100)
+ context.time.sleep(5)
}
future.get
}
@@ -214,28 +214,20 @@ class BrokerLifecycleManagerTest {
ctx.controllerNodeProvider.node.set(controllerNode)
val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
- val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx,
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
-
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
Collections.emptyMap(), OptionalLong.empty())
poll(ctx, manager, registration)
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
- poll(ctx, manager, heartbeats(0)).data()
- val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
-
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
- poll(ctx, manager, heartbeats(2)).data()
- val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
-
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
- poll(ctx, manager, heartbeats(4)).data()
- val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()
-
- assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString),
dirs1.asScala.toSet)
- assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA",
"ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet)
- assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow",
"1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet)
+ val latestHeartbeat = Seq.fill(10)(
+ prepareResponse[BrokerHeartbeatRequest](ctx, new
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+ ).map(poll(ctx, manager, _)).last
+ assertEquals(
+ Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow",
"1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString),
+ latestHeartbeat.data().offlineLogDirs().asScala.toSet)
manager.close()
}
diff --git
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index a3b8643fb73..c3265d6be7f 100644
---
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
+++
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -32,7 +32,7 @@ class MockNodeToControllerChannelManager(
val retryTimeoutMs: Int = 60000,
val requestTimeoutMs: Int = 30000
) extends NodeToControllerChannelManager {
- val unsentQueue = new java.util.ArrayDeque[NodeToControllerQueueItem]()
+ val unsentQueue = new
java.util.concurrent.ConcurrentLinkedDeque[NodeToControllerQueueItem]()
client.setNodeApiVersions(controllerApiVersions)