Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao merged PR #14903: URL: https://github.com/apache/kafka/pull/14903 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018530584 @soarez : Thanks for triaging the test failures. Will merge the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018506864 @junrao all the failed tests are being tracked. These were already tracked: * KAFKA-8041 kafka.server.LogDirFailureTest.testIOExceptionDuringCheckpoint(String).quorum=kraft * KAFKA-8115 org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() * KAFKA-15772 org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testAbortTransactionTimeout(String).quorum=kraft * KAFKA-15897 kafka.server.ControllerRegistrationManagerTest.testWrongIncarnationId() * KAFKA-15898 org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers() * KAFKA-15921 kafka.api.SaslScramSslEndToEndAuthorizationTest.testAuthentications(String).quorum=kraft * KAFKA-15927 org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault() * KAFKA-15927 org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault() * KAFKA-15928 org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() * KAFKA-15945 org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs() * KAFKA-15961 kafka.controller.ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart() * KAFKA-16225 kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(String).quorum=kraft * KAFKA-16323 kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric() This one was not, so I created a JIRA: * KAFKA-16422 org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018408733 @soarez : Are the failed tests in the latest run being tracked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-2014828308 Hey @junrao . Sorry I didn't reply about the failing tests earlier, I was still looking into it and should've made that clear. I had a look at the test results. It's strange that all the failed tests are categorized as "new failures", but most of them are known flaky tests. These tests already have a respective open JIRA: * KAFKA-8041 kafka.server.LogDirFailureTest.testIOExceptionDuringCheckpoint(String).quorum=kraft * KAFKA-8115 org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() * KAFKA-8250 kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft * KAFKA-8677 kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest."testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(String, boolean).quorum=kraft, isIdempotenceEnabled=true" * KAFKA-13514 org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack = false * KAFKA-15104 org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful [1] Type=Raft-Combined, MetadataVersion=3.8-IV0, Security=PLAINTEXT * KAFKA-15411 kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest."testNoProduceWithDescribeAcl(String, boolean).quorum=kraft, isIdempotenceEnabled=true" * KAFKA-15898 org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers() * KAFKA-15914 org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId * KAFKA-15917 OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks * KAFKA-15927 org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault() * KAFKA-15928 org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() * KAFKA-15940 kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft * KAFKA-15945 org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testSyncTopicConfigs() * KAFKA-16024 kafka.api.SaslPlainPlaintextConsumerTest.testCoordinatorFailover(String, String).quorum=kraft+kip848.groupProtocol=consumer * KAFKA-16174 org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful [5] Type=Raft-Combined, MetadataVersion=3.8-IV0, Security=PLAINTEXT * KAFKA-16225 kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(String).quorum=kraft * KAFKA-16323 kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric() I could not find JIRAs for these, so I created them: * KAFKA-16402 org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad() * KAFKA-16403 org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords() * KAFKA-16404 org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig() I also couldn't find JIRAs for these, but they're timeouts, so it could have been a failure of the test environment and not necessarily something wrong in the test. I've rebased to trigger a new build, and we'll see if they remain. * org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean).hasConsumerRack = false * org.apache.kafka.clients.consumer.StickyAssignorTest.testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean).hasConsumerRack = false * org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsNonExistingGroup(). * org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorValidate * org.apache.kafka.server.ClientMetricsManagerTest.testCacheEvictionWithMultipleClients() None of the 26 tests is related to this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1534216092 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +262,41 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +ctx.controllerNodeProvider.node.set(controllerNode) + +manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) -TestUtils.retry(6) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) -} -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) -TestUtils.retry(1) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) -} +def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) +def nextHeartbeatRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + +// Broker registers and response sets epoch to 1000L +assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + +nextHeartbeatRequest() // poll for next request as way to synchronize with the new value into brokerEpoch +assertEquals(1000L, manager.brokerEpoch) + +// Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) -TestUtils.retry(6) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) -} + +// Depending on scheduling, the next request could either be BrokerRegistration or BrokerHeartbeat. Review Comment: You're right. This does not seem necessary. I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1532500641 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +262,41 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +ctx.controllerNodeProvider.node.set(controllerNode) + +manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) -TestUtils.retry(6) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) -} -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) -TestUtils.retry(1) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) -} +def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) +def nextHeartbeatRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + +// Broker registers and response sets epoch to 1000L +assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + +nextHeartbeatRequest() // poll for next request as way to synchronize with the new value into brokerEpoch +assertEquals(1000L, manager.brokerEpoch) + +// Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) -TestUtils.retry(6) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) -} + +// Depending on scheduling, the next request could either be BrokerRegistration or BrokerHeartbeat. Review Comment: Before calling `manager.handleKraftJBODMetadataVersionUpdate()`, there should be 1 `CommunicationEvent` with a delay of 100 in `eventQueue`. Until we call `poll`, this `CommunicationEvent` will remain in `eventQueue`. Calling `manager.handleKraftJBODMetadataVersionUpdate()` causes a `KraftJBODMetadataVersionUpdateEvent` with no delay to be added to `eventQueue`. The `KraftJBODMetadataVersionUpdateEvent` will then be processed and replace the `CommunicationEvent` with a delay of 0. The `CommunicationEvent` will then be processed, which causes a `BrokerRegistration` request to be sent. So, it seems that the next request should always be `BrokerRegistration` when we call `poll`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683586 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + // If the manager is idling until scheduled events we need to advance the clock + if (manager.eventQueue.scheduledAfterIdling() +.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid triggering timeout events Review Comment: That's a good idea. I'm making that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683014 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +ctx.controllerNodeProvider.node.set(controllerNode) + +manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) -TestUtils.retry(6) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) -} -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) -TestUtils.retry(1) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) -} +def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) +def nextRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + +// Broker registers and response sets epoch to 1000L +assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + +nextRequest() // poll for next request as way to synchronize with the new value into brokerEpoch +assertEquals(1000L, manager.brokerEpoch) + +// Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) -TestUtils.retry(6) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) -} + +// We may have to accept some heartbeats before the new registration is sent +while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])() Review Comment: No, I don't think so. `prepareResponse` delegates to `MockClient` which expects a predetermined request-response order. It supports a `RequestMatcher` which `prepareResponse` uses to extract the request, but it does not support preparing a conditional response. We need a larger change to add support in `MockClient` for conditional prepared responses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1530918656 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +ctx.controllerNodeProvider.node.set(controllerNode) + +manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) -TestUtils.retry(6) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) -} -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) -TestUtils.retry(1) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) -} +def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) +def nextRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + +// Broker registers and response sets epoch to 1000L +assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + +nextRequest() // poll for next request as way to synchronize with the new value into brokerEpoch +assertEquals(1000L, manager.brokerEpoch) + +// Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) -TestUtils.retry(6) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) -} + +// We may have to accept some heartbeats before the new registration is sent +while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])() Review Comment: `prepareResponse` knows the request type. Could we generate the response corresponding to the request type there? ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + // If the manager is idling until scheduled events we need to advance the clock + if (manager.eventQueue.scheduledAfterIdling() +.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid triggering timeout events Review Comment: I see. Perhaps we could add a step after `poll(ctx, manager, registration)` to explicitly drain `RegistrationTimeoutEvent` first? This makes the test safer since it won't be sensitive to the timeout config values. ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest { Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) +def nextHeartbeatDirs(): Set[String] = + poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( +.data().offlineLogDirs().asScala.map(_.toString).toSet +assertEquals(Set.empty, nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) +assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs()) manager.propag
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-2006292277 Thanks for the review @junrao. Please take another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1529927411 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + // If the manager is idling until scheduled events we need to advance the clock + if (manager.eventQueue.scheduledAfterIdling() +.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid triggering timeout events Review Comment: This is a bit ugly. It only applies to `RegistrationTimeoutEvent`. We want to avoid triggering those deferred events. Instead of checking for that class specifically, I thought there may be other timeout event types in the future and we'll also want to avoid those. I also ruled out extending EventQueue.Event as we only care about this in this test suite. I feel ambivalent about the approach I've taken here, so please let me know if you have a preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1529919190 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest { Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) +def nextHeartbeatDirs(): Set[String] = + poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( +.data().offlineLogDirs().asScala.map(_.toString).toSet +assertEquals(Set.empty, nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) +assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) +assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), nextHeartbeatDirs()) Review Comment: Thanks for carefully thinking this through. I believe there's a mistake in that sequence of events though: > nextHeartbeatDirs() causes the time to advance quickly and passes 100 This isn't how `BrokerLifecycleManagerTest#poll()` works. It will only advance time if there is no event in the queue that can immediately run, so o1 will always run before c2. > o1 is processed at manager.eventQueue and a CommunicationEvent c3 is appended to manager.eventQueue Because o1 runs before c2, the EventQueue will override c2 with c3 as they're both deferred events with the same tag. After running this test tens of thousands of iterations, I was unable to reproduce a flaky run. ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +ctx.controllerNodeProvider.node.set(controllerNode) + +manager.start(() => ctx.highestMetadataOffset.get(), + ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.of(10L)) -TestUtils.retry(6) { - assertEquals(1, context.mockChannelManager.unsentQueue.size) - assertEquals(10L, context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch()) -} -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1000)), controllerNode) -TestUtils.retry(1) { - context.poll() - assertEquals(1000L, manager.brokerEpoch) -} +def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, manager, prepareResponse[T](ctx, response)) +def nextRequest() = doPoll[AbstractRequest](new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +def nextRegistrationRequest(epoch: Long) = + doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(epoch))) + +// Broker registers and response sets epoch to 1000L +assertEquals(10L, nextRegistrationRequest(1000L).data().previousBrokerEpoch()) + +nextRequest() // poll for next request as way to synchronize with the new value into brokerEpoch +assertEquals(1000L, manager.brokerEpoch) + +// Trigger JBOD MV update manager.handleKraftJBODMetadataVersionUpdate() -context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse( - new BrokerRegistrationResponseData().setBrokerEpoch(1200)), controllerNode) -TestUtils.retry(6) { - context.time.sleep(100) - context.poll() - manager.eventQueue.wakeup() - assertEquals(1200, manager.brokerEpoch) -} + +// We may have to accept some heartbeats before the new registration is sent +while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])() Review Comment: I found it difficult to predict when the new `BrokerRegistration` request is sent. Depending on scheduling it could be sent straight away or it could get queued behind another heartbeat. It is indeed incorrect to mock
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1526788841 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -551,9 +580,11 @@ class BrokerLifecycleManager( } private def scheduleNextCommunication(intervalNs: Long): Unit = { -trace(s"Scheduling next communication at ${MILLISECONDS.convert(intervalNs, NANOSECONDS)} " + +val nanos = if (nextSchedulingShouldBeImmediate) 0 else intervalNs Review Comment: nanos => adjustedIntervalNs? ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest { Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) +def nextHeartbeatDirs(): Set[String] = + poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( +.data().offlineLogDirs().asScala.map(_.toString).toSet +assertEquals(Set.empty, nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) +assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs()) manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) +assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), nextHeartbeatDirs()) Review Comment: It seems this could still be flaky. `poll(ctx, manager, registration) ` A CommunicationEvent c1 with a delay of 0 is scheduled in manager.eventQueue. `assertEquals(Set.empty, nextHeartbeatDirs()) ` c1 is processed in `ctx.mockChannelManager` and a BrokerHeartbeatResponseEvent b1 is added to `manager.eventQueue`. b1 is processed at `manager.eventQueue` and a CommunicationEvent c2 with a delay of 100 is scheduled in `manager.eventQueue`. `manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) ` An OfflineDirEvent o1 is appended to `manager.eventQueue`, but not yet processed. `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs()) ` nextHeartbeatDirs() causes the time to advance quickly and passes 100. c2 is processed at `manager.eventQueue` and a HeartBeat request h1 is added to `ctx.mockChannelManager`. o1 is processed at `manager.eventQueue` and a CommunicationEvent c3 is appended to `manager.eventQueue`. c3 is processed at `manager.eventQueue` sets `nextSchedulingShouldBeImmediate` to true. h1 is processed by `ctx.mockChannelManager` and adds BrokerHeartbeatResponseEvent b2 to `manager.eventQueue`. `manager.eventQueue` processes b2 and schedules a CommunicationEvent c4 with a delay of 0 in `manager.eventQueue`. c4 is processed at `manager.eventQueue` and a HeartBeat request h2 (doesn't pick up ej8Q9_d2Ri6FXNiTxKFiow) is added to `ctx.mockChannelManager`. `manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))` `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), nextHeartbeatDirs())` Now the above assertion will fail since `nextHeartbeatDirs()` will pick up c4 which doesn't include ej8Q9_d2Ri6FXNiTxKFiow. ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +514,36 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Returns the deferred event that the queue is waiting for, idling until + * its deadline comes, if there is any. + * If the queue has immediate work to do, this returns empty. + * This is useful for unit tests, where to make progress, we need to + * speed the clock up until the next scheduled event is ready to run. + */ +public Optional scheduledAfterIdling() { Review Comment: scheduledAfterIdling => firstDeferredIfIdling ? ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest { @Test def testKraftJBODMetadataVersionUpdateEvent(): Unit = { -val context = new RegistrationTestContext(configProperties) -val manager = new BrokerLifecycleManager(context.config, context.time, "successful-registration-", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) +val ctx = new RegistrationTestContext(configProperties) +val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "jbod-metadata-version-update", isZkBroker = false, Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ"))) val controllerNode = new Node(3000, "localhost", 8021) -context.controllerNodeProvider.node.set(controllerNode) -manager.start(() => context.highestMetadataOffset.get(), - context.mockChannelManager, context.clusterId, context.advertisedListeners, +ctx.controllerNodeProvider.node.set(control
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1998592089 Rebased this due to merge conflict. @gaurav-narula I also made what I believe are some improvement to `BrokerLifecycleManagerTest.testKraftJBODMetadataVersionUpdateEvent`, can you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
github-actions[bot] commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1987577154 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
rondagostino commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1850988044 `12 tests have failed. There are 12 new tests failing, 0 existing failing` I think this PR can be merged now assuming @cmccabe takes a look at the question above regarding prepend to event queue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
OmniaGM commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1850387349 > `51 tests have failed. There are 49 new tests failing, 2 existing failing` > > This seems like a lot more failures than we are used to recently. @OmniaGM Can you rebase this onto latest `trunk` to see if we can get a cleaner build? done! I don't have related tests failing locally. Waiting for the build now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
rondagostino commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1850245547 `51 tests have failed. There are 49 new tests failing, 2 existing failing` This seems like a lot more failures than we are used to recently. @OmniaGM Can you rebase this onto latest `trunk` to see if we can get a cleaner build? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1843813913 @junrao: Thanks for revewing this. It seems there are currently some issues with faling tests. See https://github.com/apache/kafka/pull/14838#issuecomment-1843693525 The test failures for this PR don't seem to be related to the changes, but to be sure we can also wait a bit longer and see if the state of the build improves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1417911671 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -364,10 +377,30 @@ class BrokerLifecycleManager( } _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data), new BrokerRegistrationResponseHandler()) +communicationInFlight = true } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) Review Comment: @cmccabe : Do you have concerns on the usage of `prepend` here? ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. + * This variable can only be read or written from the event queue thread. + */ + private var communicationInFlight = false Review Comment: Yes, I agree that it's not straightforward. We can just leave the code as it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1417143188 ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ +public Object pendingDeferredEvent() { +lock.lock(); +try { +if (eventHandler.head.next != eventHandler.head) { +return null; +} +Map.Entry entry = eventHandler.deadlineMap.firstEntry(); +if (entry == null) { +return null; +} +EventContext eventContext = entry.getValue(); +if (eventContext.insertionType == EventInsertionType.DEFERRED) { Review Comment: That is incorrect. Events of any type can end up in `deadlineMap` as long as they have a deadline specified. The notion of deadline here is a bit confusing. A deadline for events of type `DEFERRED` is the time after which the event should run. For events of other types, the deadline is the time after which `TimeoutException` should be delivered if the event hasn't run yet. So we want to ignore events that are not of type `DEFERRED`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1417129195 ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ +public Object pendingDeferredEvent() { +lock.lock(); +try { +if (eventHandler.head.next != eventHandler.head) { Review Comment: Well, yes, deferred events are not stored under `head`, but if there are any events under `head` we should not fast forward the clock, as those will run regardless. The point here is to return the first event that will run next, when its deadline comes, but if and only if there's nothing else to do until then. I'll get a better name and javadoc for this method so this makes more sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1417124246 ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ +public Object pendingDeferredEvent() { Review Comment: Sure 👍 ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ +public Object pendingDeferredEvent() { Review Comment: Sure 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1417121867 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + if (Option(manager.eventQueue.pendingDeferredEvent()).exists(!_.getClass.getSimpleName.endsWith("TimeoutEvent"))) +ctx.time.sleep(5) manager.eventQueue.wakeup() - context.time.sleep(5) +} +while (!future.isDone || ctx.mockClient.hasInFlightRequests) { Review Comment: Yes, it seems that is enough. Changing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1417116727 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. + * This variable can only be read or written from the event queue thread. + */ + private var communicationInFlight = false Review Comment: I don't see a straightforward way to do this. Maybe I'm missing something, but`NetworkClient` does not look like a good place for this. `kafka.server.NodeToControllerChannelManagerImpl#sendRequest` isn't synchronous, it runs `requestThread.enqueue`, so if we rely on `networkClient` state we open the door to race conditions. Also, `NodeToControllerRequestThread` can reset the network client at any point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1416090404 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. + * This variable can only be read or written from the event queue thread. + */ + private var communicationInFlight = false Review Comment: The `NetworkClient` in `_channelManager` already maintains the state for in flight requests through `networkClient.ready(node, now)`. So, I am wondering if we need to maintain the state here. Could we just expose the state in `NetworkClient` through `_channelManager`? ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + if (Option(manager.eventQueue.pendingDeferredEvent()).exists(!_.getClass.getSimpleName.endsWith("TimeoutEvent"))) +ctx.time.sleep(5) manager.eventQueue.wakeup() - context.time.sleep(5) +} +while (!future.isDone || ctx.mockClient.hasInFlightRequests) { Review Comment: Do we need `ctx.mockClient.hasInFlightRequests`? It seems that `!future.isDone` is enough? ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ +public Object pendingDeferredEvent() { Review Comment: Could we return Optional instead of null? ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ +public Object pendingDeferredEvent() { +lock.lock(); +try { +if (eventHandler.head.next != eventHandler.head) { +return null; +} +Map.Entry entry = eventHandler.deadlineMap.firstEntry(); +if (entry == null) { +return null; +} +EventContext eventContext = entry.getValue(); +if (eventContext.insertionType == EventInsertionType.DEFERRED) { Review Comment: Do we need this check? It seems only `DEFERRED` events are stored in `deadlineMap`. ## server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java: ## @@ -513,4 +513,28 @@ public void close() throws InterruptedException { eventHandlerThread.join(); log.info("closed event queue."); } + +/** + * Useful for unit tests, where we need to speed the clock up until + * deferred events are ready to run. + */ +public Object pendingDeferredEvent() { +lock.lock(); +try { +if (eventHandler.head.next != eventHandler.head) { Review Comment: Deferred events are not stored under `head`. It seems there is not need to check `head`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1415408700 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. Review Comment: We do. The very first Registration request is also scheduled via a `CommunicationEvent`. `communicationInFlight = true` is now set after both `_channelManager.sendRequest(new BrokerRegistrationRequest...` and `_channelManager.sendRequest(new BrokerHeartbeatRequest...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839871090 @cmccabe thanks for having a look. I've made the following changes: * Replaced the use of `prepend` with `append` in `propagateDirectoryFailure` * Moved `communicationInFlight = true` after `_channelManager.sendRequest()` * Always set `nextSchedulingShouldBeImmediate = false` in `scheduleNextCommunicationAfterFailure` * Moved checking if `communicationInFlight = true` to `CommunicationEvent.run()` @junrao @cmccabe: please take another look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414728861 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + if (manager.eventQueue.isEmpty) Review Comment: You're right, this is incorrect. We must only advance the time if the eventQueue has an event scheduled at a future time, unless the next event is a timeout event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414711941 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: Ok. I wasn't sure that communicationInFlight is for both requests. Since `NodeToControllerChannelManagerImpl` uses 1 for `maxInFlightRequestsPerConnection`, another option is to add a `canSendRequest` api in `NodeToControllerChannelManager`, instead maintaining `communicationInFlight` here. `canSendRequest` api can be implemented through `networkClient.ready(node, now)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414708239 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: Since `NodeToControllerChannelManagerImpl` uses 1 for `maxInFlightRequestsPerConnection`, we could also add a `sendRequest` api in `NodeToControllerChannelManager`, instead networkClient.ready(node, now) ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: Since `NodeToControllerChannelManagerImpl` uses 1 for `maxInFlightRequestsPerConnection`, we could also add a `sendRequest` api in `NodeToControllerChannelManager`, instead networkClient.ready(node, now) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414704903 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. Review Comment: Hmm, if that's the case, should we set communicationInFlight to true when sending the very first Registration request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
cmccabe commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839768073 > Instead, CommunicationEvent.run() should gracefully handle running when communicationInFlight = true Probably by doing nothing. This also implies that `scheduleNextCommunicationAfterSuccess` should be checking `nextSchedulingShouldBeImmediate` (not sure why it doesn't already) Probably `nextSchedulingShouldBeImmediate` should be renamed to something like "dirty" since it doesn't actually mean the next scheduling should always be immediate. Like if there has been a communication error (`scheduleNextCommunicationAfterFailure`) we do not want to immediately reschedule, no matter what. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
cmccabe commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839747217 Thanks for looking at this, @soarez . I don't think `nextSchedulingShouldBeImmediate` should be reset to false until the `CommunicationEvent` is run. I also think we should be a bit more careful about setting `communicationInFlight` ... it should only be set after `_channelManager.sendRequest` has been called. And even that call should probably be wrapped in a `try...catch`. If `sendRequest` throws, we don't want to be stuck never sending another request. (I don't think sendRequest is supposed to throw, but we should be careful.) I don't think prepend is needed anywhere. To be honest, the presence of `prepend` usually indicates a bug... :) Instead, `CommunicationEvent.run()` should gracefully handle running when `communicationInFlight = true` Probably by doing nothing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414647339 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. Review Comment: It applies to both Heartbeat and Registration requests which are the only requests this manager sends. I think we're also fixing a bug were a second registration request could have been scheduled after a call to `propagateDirectoryFailure` and before the registration response is received. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414633539 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -453,79 +490,73 @@ class BrokerLifecycleManager( val message = response.responseBody().asInstanceOf[BrokerHeartbeatResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { - // this response handler is not invoked from the event handler thread, - // and processing a successful heartbeat response requires updating - // state, so to continue we need to schedule an event - eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data())) + val responseData = message.data() + failedAttempts = 0 + _state match { +case BrokerState.STARTING => + if (responseData.isCaughtUp) { +info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.") +_state = BrokerState.RECOVERY +initialCatchUpFuture.complete(null) + } else { +debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.") + } + // Schedule the heartbeat after only 10 ms so that in the case where + // there is no recovery work to be done, we start up a bit quicker. + scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS)) +case BrokerState.RECOVERY => + if (!responseData.isFenced) { +info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.") +initialUnfenceFuture.complete(null) +_state = BrokerState.RUNNING + } else { +info(s"The broker is in RECOVERY.") + } + scheduleNextCommunicationAfterSuccess() +case BrokerState.RUNNING => + debug(s"The broker is RUNNING. Processing heartbeat response.") + scheduleNextCommunicationAfterSuccess() +case BrokerState.PENDING_CONTROLLED_SHUTDOWN => + if (!responseData.shouldShutDown()) { +info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " + + "for the active controller.") +if (!gotControlledShutdownResponse) { + // If this is the first pending controlled shutdown response we got, + // schedule our next heartbeat a little bit sooner than we usually would. + // In the case where controlled shutdown completes quickly, this will + // speed things up a little bit. + scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS)) +} else { + scheduleNextCommunicationAfterSuccess() +} + } else { +info(s"The controller has asked us to exit controlled shutdown.") +beginShutdown() + } + gotControlledShutdownResponse = true +case BrokerState.SHUTTING_DOWN => + info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat response.") +case _ => + error(s"Unexpected broker state ${_state}") + scheduleNextCommunicationAfterSuccess() + } } else { warn(s"Broker $nodeId sent a heartbeat request but received error $errorCode.") scheduleNextCommunicationAfterFailure() } } } - -override def onTimeout(): Unit = { - info("Unable to send a heartbeat because the RPC got timed out before it could be sent.") - scheduleNextCommunicationAfterFailure() -} } - private class BrokerHeartbeatResponseEvent(response: BrokerHeartbeatResponseData) extends EventQueue.Event { -override def run(): Unit = { - failedAttempts = 0 - _state match { -case BrokerState.STARTING => - if (response.isCaughtUp) { -info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.") -_state = BrokerState.RECOVERY -initialCatchUpFuture.complete(null) - } else { -debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.") - } - // Schedule the heartbeat after only 10 ms so that in the case where - // there is no recovery work to be done, we start up a bit quicker. - scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS)) -case BrokerState.RECOVERY => - if (!response.isFenced) { -info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.") -initialUnfenceFuture.complete(null) -_state = BrokerState.RUNNING - } else { -info(s"The broker is in RECOVERY.") - }
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414626881 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: I don't think so. We set `communicationInFlight = true` in `CommunicationEvent.run`, which sends both heartbeats but also registration requests. Maybe I'm missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
junrao commented on code in PR #14903: URL: https://github.com/apache/kafka/pull/14903#discussion_r1414322506 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest { result } - def poll[T](context: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { -while (!future.isDone || context.mockClient.hasInFlightRequests) { - context.poll() + def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, future: Future[T]): T = { +while (ctx.mockChannelManager.unsentQueue.isEmpty) { + if (manager.eventQueue.isEmpty) Review Comment: Why do we need this check? If the eventQueue has a event scheduled event at a future time, we need to advance the time to drain the event, right? ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -166,6 +166,19 @@ class BrokerLifecycleManager( */ private var registered = false + /** + * True if a request has been sent and a response or timeout has not yet been processed. Review Comment: a request => a Heartbeat request ? ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -366,8 +379,27 @@ class BrokerLifecycleManager( new BrokerRegistrationResponseHandler()) } + // the response handler is not invoked from the event handler thread, + // so it is not safe to update state here, instead, schedule an event + // to continue handling the response on the event handler thread private class BrokerRegistrationResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { + eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false)) +} + +override def onTimeout(): Unit = { + info(s"Unable to register the broker because the RPC got timed out before it could be sent.") + eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true)) +} + } + + private class BrokerRegistrationResponseEvent(response: ClientResponse, timedOut: Boolean) extends EventQueue.Event { +override def run(): Unit = { + communicationInFlight = false Review Comment: It seems that we don't need this since `communicationInFlight` is only set to true when sending a `HeartbeatRrequest`? ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -453,79 +490,73 @@ class BrokerLifecycleManager( val message = response.responseBody().asInstanceOf[BrokerHeartbeatResponse] val errorCode = Errors.forCode(message.data().errorCode()) if (errorCode == Errors.NONE) { - // this response handler is not invoked from the event handler thread, - // and processing a successful heartbeat response requires updating - // state, so to continue we need to schedule an event - eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data())) + val responseData = message.data() + failedAttempts = 0 + _state match { +case BrokerState.STARTING => + if (responseData.isCaughtUp) { +info(s"The broker has caught up. Transitioning from STARTING to RECOVERY.") +_state = BrokerState.RECOVERY +initialCatchUpFuture.complete(null) + } else { +debug(s"The broker is STARTING. Still waiting to catch up with cluster metadata.") + } + // Schedule the heartbeat after only 10 ms so that in the case where + // there is no recovery work to be done, we start up a bit quicker. + scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS)) +case BrokerState.RECOVERY => + if (!responseData.isFenced) { +info(s"The broker has been unfenced. Transitioning from RECOVERY to RUNNING.") +initialUnfenceFuture.complete(null) +_state = BrokerState.RUNNING + } else { +info(s"The broker is in RECOVERY.") + } + scheduleNextCommunicationAfterSuccess() +case BrokerState.RUNNING => + debug(s"The broker is RUNNING. Processing heartbeat response.") + scheduleNextCommunicationAfterSuccess() +case BrokerState.PENDING_CONTROLLED_SHUTDOWN => + if (!responseData.shouldShutDown()) { +info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, still waiting " + + "for the active controller.") +if (!gotControlledShutdownResponse) { + // If this is the first pending controlled shutdown response we got, + // schedule our next heartbeat a little bit sooner than we usually would. + // In the case where controlled shutdown comp
Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]
soarez commented on PR #14903: URL: https://github.com/apache/kafka/pull/14903#issuecomment-1837447759 @junrao: please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org