Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1410038816 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: @junrao : That's a good point, you're right. In between `HeartbeatRequest` being sent and the response being handled, `propagateDirectoryFailure` could be called, immediately scheduling a `HeartbeatRequest`, causing an extra request. It looks like this was already the case with `setReadyToUnfence()` and `beginControlledShutdown()`, which can also cause an extra request in the same way. We can avoid the extra requests by checking - in `OfflineDirEvent.run`, `SetReadyToUnfenceEvent.run` and `BeginControlledShutdownEvent.run` - whether a request is inflight, and delaying calling `scheduleNextCommunicationImmediately` until after the response is received. Please see #14874 about there explaining comment for the test. I also see you've filed [KAFKA-15950](https://issues.apache.org/jira/browse/KAFKA-15950). Thanks, I'll have a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
junrao commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1409739363 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: @soarez : Thanks for the explanation. You are right that `KafkaEventQueue` does de-duping and only allows one outstanding `CommunicationEvent` in the queue. But it seems that duplicated `HeartbeatRequest`s could still be generated, which is causing the original transient failure. `CommunicationEvent` calls `sendBrokerHeartbeat` that calls the following. `_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), handler) ` The problem is that we have another queue in `NodeToControllerChannelManagerImpl` that doesn't do the de-duping. Once a `CommunicationEvent` is dequeued from `KafkaEventQueue`, a `HeartbeatRequest` will be queued in `NodeToControllerChannelManagerImpl`. At this point, another `CommunicationEvent` could be enqueued in `KafkaEventQueue`. When it's processed, another `HeartbeatRequest` will be queued in `NodeToControllerChannelManagerImpl`. This probably won't introduce long lasting duplicated `HeartbeatRequest` in practice since `CommunicationEvent` is typically queued in `KafkaEventQueue` for heartbeat interval. By that time, other pending `HeartbeatRequest`s will be processed and de-duped when enqueuing to `KafkaEventQueue`. But, maybe we could file a jira to track it. For the test, could we add a comment to explain why we need to wait for 10 `HeartbeatRequest`s? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1408475724 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: > there will be 29 extra HeartBeatRequests continuously every heartbeat interval, right? No, I think it's just once. After receiving a response to a `HeartBeatRequest`, a `BrokerHeartbeatResponseEvent` runs which ends up calling `scheduleNextCommunication`. This queues a CommunicationEvent (which sends the next heartbeat) to the KafkaEventQueue using a tag. Because the tag is always the same, any previously queued event with the same tag is cancelled. So even if we receive 2 `HeartBeatRequest` responses in quick succession, we only send a single following heartbeat request. Taking another look at the code, I'm wrong about the extra requests, `propagateDirectoryFailure` triggers the "extra heartbeat" also using `scheduleNextCommunication` — so the pending `HeartBeatRequest` is cancelled indeed! ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: > there will be 29 extra HeartBeatRequests continuously every heartbeat interval, right? No, I think it's just once. After receiving a response to a `HeartBeatRequest`, a `BrokerHeartbeatResponseEvent` runs which ends up calling `scheduleNextCommunication`. This queues a CommunicationEvent (which sends the next heartbeat) to the KafkaEventQueue using a tag. Because the tag is always the same, any previously queued event with the same tag is cancelled. So even if we receive 2 `HeartBeatRequest` responses in quick succession, we only send a single following heartbeat request. Taking another look at the code, I'm wrong about the extra requests, `propagateDirectoryFailure` triggers the "extra heartbeat" also using `scheduleNextCommunication` — so the pending `HeartBeatRequest` is cancelled indeed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
junrao commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1408460800 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: @soarez : Just to be clear. In your example, there will be 29 extra `HeartBeatRequest`s continuously every heartbeat interval, right? It may not overwhelm the broker. But it's unnecessary work and makes debugging much harder. We don't necessarily need to delay the propagation of failed disks. For example, when adding HeartBeatRequest to the queue, if the schedule is earlier than a pending `HeartBeatRequest` in the queue, we could cancel the pending request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1408408855 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: Thanks for raising this @junrao `manager.propagateDirectoryFailure` is called from `replicaManager.handleLogDirFailure` which handles directory failure only once for each directory. In most cases, the number of configured directories per broker should be under 10. Even if we consider 30, there can be 29 extra `HeartBeatRequest`s (the last one shuts down the broker). Maybe I'm missing something, but I don't expect these to overwhelm the controller. That said, if you still think this may be an issue, we can avoid triggering a separate `HearBeatRequest` and just wait for the next one. Otherwise, I think it makes sense not to delay updating leadership and ISR as necessary after any directory failure. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
junrao commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1408227531 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: @soarez : Thanks for the explanation. Currently, when processing a successful `HeartBeatResponse`, `BrokerLifecycleManager` automatically schedules a new `HeartBeatRequest`, regardless whether there is a pending HeartBeatRequest in `KafkaEventQueue`. Normally, only BrokerRegistration triggers the initial HeartBeatRequest. So, there is only one HeartBeatRequest per heartbeat interval. However, each `manager.propagateDirectoryFailure` now independently triggers a separate `HeartBeatRequest`. This means every `manager.propagateDirectoryFailure` call adds one more `HeartBeatRequest` per heartbeat interval forever during the lifetime of a broker. This could unnecessarily overwhelm the controller. So, this seems to be a real issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406906907 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: It's true in theory, but in this unit test, it is not. I just tried with 6 heartbeats, and in 1,000 repetitions of the test, one of the runs was missing the last directory. On my laptop, with 7 heartbeats, 10,000 test runs had no failures. I think this happens because of how `poll()` works: it's rapidly advancing the clock and notifying three separate systems - BrokerLifecycleManager, MockClient and MockChannelManager — signaling them using `Object.notify()`, which does not guarantee each of those threads will run straightaway, it's still up to the OS to schedule them onto the CPU. In practice, outside of unit tests, the delays in scheduling the BrokerLifecycleManager thread should be insignificant compared to the heartbeat interval. So I don't expect failures to be delayed for more than a heartbeat. If the test proves to still be flaky even with 10 heartbeats, I think we can just increase the number. ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: It's true in theory, but in this unit test, it is not. I just tried with 6 heartbeats, and in 1,000 repetitions of the test, one of the runs was missing the last directory. On my laptop, with 7 heartbeats, 10,000 test runs had no failures. I think this happens because of how `poll()` works: it's rapidly advancing the clock and notifying three separate systems - BrokerLifecycleManager, MockClient and MockChannelManager — signaling them using `Object.notify()`, which does not guarantee each of those threads will run straightaway, it's still up to the OS to schedule them onto the CPU. In practice, outside of unit tests, the delays in scheduling the BrokerLifecycleManager thread should be insignificant compared to the heartbeat interval. So I don't expect failures to be delayed for more than a heartbeat. If the test proves to still be flaky even with 10 heartbeats, I think we can just increase the number. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
junrao commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406880863 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: Thanks for the explanation, @soarez. There is a heartbeat request after the initial registration. Each `manager.propagateDirectoryFailure` could trigger a separate heartbeat request. So, is it true that after the 4th heartbeat, each heart request is guaranteed to include all three failed dirs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406861254 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -214,28 +214,20 @@ class BrokerLifecycleManagerTest { ctx.controllerNodeProvider.node.set(controllerNode) val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) -val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( - manager.start(() => ctx.highestMetadataOffset.get(), ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) -poll(ctx, manager, heartbeats(0)).data() -val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs() - manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) -poll(ctx, manager, heartbeats(2)).data() -val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs() - manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg")) -poll(ctx, manager, heartbeats(4)).data() -val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs() - -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), dirs1.asScala.toSet) -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet) -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", "1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet) +val latestHeartbeat = Seq.fill(10)( + prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +).map(poll(ctx, manager, _)).last Review Comment: I need to update the KIP. Last week in a disucssion with @cmccabe and @pprovenzano, we realized that because of overload mode for heartbeats, it will be easier to handle failed log directories if the broker always sends the accumulated list. Hence #14770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406855449 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: No. This can mitigate it, but it cannot prevent the race condition entirely. AssignmentsManager has its own event loop thread that is batching and sending the accumulated failed directories. It's a bit tricky to predict the content of each request, so instead I opted to only assert after a few heartbeats. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
junrao commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406820475 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: What's causing the following failure before? Does this change fix the issue? ``` rg.opentest4j.AssertionFailedError: Expected :Set(h3sC4Yk-Q9-fd0ntJTocCA, ej8Q9_d2Ri6FXNiTxKFiow, 1iF76HVNRPqC7Y4r6647eg) Actual :Set(h3sC4Yk-Q9-fd0ntJTocCA, ej8Q9_d2Ri6FXNiTxKFiow) ``` ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -214,28 +214,20 @@ class BrokerLifecycleManagerTest { ctx.controllerNodeProvider.node.set(controllerNode) val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) -val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( - manager.start(() => ctx.highestMetadataOffset.get(), ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) -poll(ctx, manager, heartbeats(0)).data() -val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs() - manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) -poll(ctx, manager, heartbeats(2)).data() -val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs() - manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg")) -poll(ctx, manager, heartbeats(4)).data() -val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs() - -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), dirs1.asScala.toSet) -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet) -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", "1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet) +val latestHeartbeat = Seq.fill(10)( + prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +).map(poll(ctx, manager, _)).last Review Comment: Hmm, KIP-858 says "The UUIDs for the newly failed log directories are included in the BrokerHeartbeat request until the broker receives a successful response.". How do we guarantee that only the 10th HeartbeatRequest picks up the failed log dirs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
cmccabe merged PR #14836: URL: https://github.com/apache/kafka/pull/14836 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on PR #14836: URL: https://github.com/apache/kafka/pull/14836#issuecomment-1825622434 Flakiness detected since #14770 cc @apoorvmittal10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org