Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-25 Thread via GitHub


junrao merged PR #14903:
URL: https://github.com/apache/kafka/pull/14903


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-25 Thread via GitHub


junrao commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018530584

   @soarez : Thanks for triaging the test failures. Will merge the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-25 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018506864

   @junrao all the failed tests are being tracked.
   
   These were already tracked:
   
   * KAFKA-8041 
kafka.server.LogDirFailureTest.testIOExceptionDuringCheckpoint(String).quorum=kraft
   * KAFKA-8115 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   * KAFKA-15772 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testAbortTransactionTimeout(String).quorum=kraft
   * KAFKA-15897 
kafka.server.ControllerRegistrationManagerTest.testWrongIncarnationId()
   * KAFKA-15898 
org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers()
   * KAFKA-15921 
kafka.api.SaslScramSslEndToEndAuthorizationTest.testAuthentications(String).quorum=kraft
   * KAFKA-15927 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault()
   * KAFKA-15927 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault()
   * KAFKA-15928 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   * KAFKA-15945 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs()
   * KAFKA-15961 
kafka.controller.ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart()
   * KAFKA-16225 
kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(String).quorum=kraft
   * KAFKA-16323 
kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric()
   
   This one was not, so I created a JIRA:
   
   * KAFKA-16422 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true"
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-25 Thread via GitHub


junrao commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-2018408733

   @soarez : Are the failed tests in the latest run being tracked?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-22 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-2014828308

   Hey @junrao . Sorry I didn't reply about the failing tests earlier, I was 
still looking into it and should've made that clear.
   
   I had a look at the test results. It's strange that all the failed tests are 
categorized as "new failures", but most of them are known flaky tests.
   
   These tests already have a respective open JIRA:
   
   * KAFKA-8041 
kafka.server.LogDirFailureTest.testIOExceptionDuringCheckpoint(String).quorum=kraft
   * KAFKA-8115 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   * KAFKA-8250 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft
   * KAFKA-8677  
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest."testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(String,
 boolean).quorum=kraft, isIdempotenceEnabled=true"
   * KAFKA-13514 
org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription(boolean).hasConsumerRack
 = false
   * KAFKA-15104 
org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful
 [1] Type=Raft-Combined, MetadataVersion=3.8-IV0, Security=PLAINTEXT
   * KAFKA-15411 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest."testNoProduceWithDescribeAcl(String,
 boolean).quorum=kraft, isIdempotenceEnabled=true"
   * KAFKA-15898 
org.apache.kafka.controller.QuorumControllerTest.testFenceMultipleBrokers()
   * KAFKA-15914 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
   * KAFKA-15917 
OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks
   * KAFKA-15927 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault()
   * KAFKA-15928 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   * KAFKA-15940 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
   * KAFKA-15945 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testSyncTopicConfigs()
   * KAFKA-16024 
kafka.api.SaslPlainPlaintextConsumerTest.testCoordinatorFailover(String, 
String).quorum=kraft+kip848.groupProtocol=consumer
   * KAFKA-16174 
org.apache.kafka.tools.MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful
 [5] Type=Raft-Combined, MetadataVersion=3.8-IV0, Security=PLAINTEXT
   * KAFKA-16225 
kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(String).quorum=kraft
   * KAFKA-16323 
kafka.server.ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric()
   
   I could not find JIRAs for these, so I created them:
   
   * KAFKA-16402 
org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad()
   * KAFKA-16403 
org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords()
   * KAFKA-16404 
org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig()
   
   I also couldn't find JIRAs for these, but they're timeouts, so it could have 
been a failure of the test environment and not necessarily something wrong in 
the test. I've rebased to trigger a new build, and we'll see if they remain.
   
   * 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean).hasConsumerRack
 = false
   * 
org.apache.kafka.clients.consumer.StickyAssignorTest.testAssignmentAndGroupWithNonEqualSubscriptionNotTimeout(boolean).hasConsumerRack
 = false
   * 
org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsNonExistingGroup().
   * 
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorValidate
   * 
org.apache.kafka.server.ClientMetricsManagerTest.testCacheEvictionWithMultipleClients()
   
   
   None of the 26 tests is related to this change. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-21 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1534216092


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +262,41 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(controllerNode)
+
+manager.start(() => ctx.highestMetadataOffset.get(),
+  ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.of(10L))
-TestUtils.retry(6) {
-  assertEquals(1, context.mockChannelManager.unsentQueue.size)
-  assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-}
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-TestUtils.retry(1) {
-  context.poll()
-  assertEquals(1000L, manager.brokerEpoch)
-}
 
+def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+def nextHeartbeatRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+def nextRegistrationRequest(epoch: Long) =
+  doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+// Broker registers and response sets epoch to 1000L
+assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+nextHeartbeatRequest() // poll for next request as way to synchronize with 
the new value into brokerEpoch
+assertEquals(1000L, manager.brokerEpoch)
+
+// Trigger JBOD MV update
 manager.handleKraftJBODMetadataVersionUpdate()
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-TestUtils.retry(6) {
-  context.time.sleep(100)
-  context.poll()
-  manager.eventQueue.wakeup()
-  assertEquals(1200, manager.brokerEpoch)
-}
+
+// Depending on scheduling, the next request could either be 
BrokerRegistration or BrokerHeartbeat.

Review Comment:
   You're right. This does not seem necessary. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-20 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1532500641


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +262,41 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(controllerNode)
+
+manager.start(() => ctx.highestMetadataOffset.get(),
+  ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.of(10L))
-TestUtils.retry(6) {
-  assertEquals(1, context.mockChannelManager.unsentQueue.size)
-  assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-}
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-TestUtils.retry(1) {
-  context.poll()
-  assertEquals(1000L, manager.brokerEpoch)
-}
 
+def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+def nextHeartbeatRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+def nextRegistrationRequest(epoch: Long) =
+  doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+// Broker registers and response sets epoch to 1000L
+assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+nextHeartbeatRequest() // poll for next request as way to synchronize with 
the new value into brokerEpoch
+assertEquals(1000L, manager.brokerEpoch)
+
+// Trigger JBOD MV update
 manager.handleKraftJBODMetadataVersionUpdate()
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-TestUtils.retry(6) {
-  context.time.sleep(100)
-  context.poll()
-  manager.eventQueue.wakeup()
-  assertEquals(1200, manager.brokerEpoch)
-}
+
+// Depending on scheduling, the next request could either be 
BrokerRegistration or BrokerHeartbeat.

Review Comment:
   Before calling `manager.handleKraftJBODMetadataVersionUpdate()`, there 
should be 1 `CommunicationEvent` with a delay of 100 in `eventQueue`. Until we 
call `poll`, this `CommunicationEvent` will remain in `eventQueue`. Calling 
`manager.handleKraftJBODMetadataVersionUpdate()` causes a 
`KraftJBODMetadataVersionUpdateEvent` with no delay to be added to 
`eventQueue`. The `KraftJBODMetadataVersionUpdateEvent` will then be processed 
and replace the `CommunicationEvent` with a delay of 0. The 
`CommunicationEvent` will then be processed, which causes a 
`BrokerRegistration` request to be sent. So, it seems that the next request 
should always be `BrokerRegistration` when we call `poll`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-20 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683586


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  // If the manager is idling until scheduled events we need to advance 
the clock
+  if (manager.eventQueue.scheduledAfterIdling()
+.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid 
triggering timeout events

Review Comment:
   That's a good idea. I'm making that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-20 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1531683014


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(controllerNode)
+
+manager.start(() => ctx.highestMetadataOffset.get(),
+  ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.of(10L))
-TestUtils.retry(6) {
-  assertEquals(1, context.mockChannelManager.unsentQueue.size)
-  assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-}
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-TestUtils.retry(1) {
-  context.poll()
-  assertEquals(1000L, manager.brokerEpoch)
-}
 
+def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+def nextRegistrationRequest(epoch: Long) =
+  doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+// Broker registers and response sets epoch to 1000L
+assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+nextRequest() // poll for next request as way to synchronize with the new 
value into brokerEpoch
+assertEquals(1000L, manager.brokerEpoch)
+
+// Trigger JBOD MV update
 manager.handleKraftJBODMetadataVersionUpdate()
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-TestUtils.retry(6) {
-  context.time.sleep(100)
-  context.poll()
-  manager.eventQueue.wakeup()
-  assertEquals(1200, manager.brokerEpoch)
-}
+
+// We may have to accept some heartbeats before the new registration is 
sent
+while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()

Review Comment:
   No, I don't think so. `prepareResponse`  delegates to `MockClient` which 
expects a predetermined request-response order. It supports a  `RequestMatcher` 
which `prepareResponse` uses to extract the request, but it does not support 
preparing a conditional response. We need a larger change to add support in 
`MockClient` for conditional prepared responses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-19 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1530918656


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(controllerNode)
+
+manager.start(() => ctx.highestMetadataOffset.get(),
+  ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.of(10L))
-TestUtils.retry(6) {
-  assertEquals(1, context.mockChannelManager.unsentQueue.size)
-  assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-}
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-TestUtils.retry(1) {
-  context.poll()
-  assertEquals(1000L, manager.brokerEpoch)
-}
 
+def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+def nextRegistrationRequest(epoch: Long) =
+  doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+// Broker registers and response sets epoch to 1000L
+assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+nextRequest() // poll for next request as way to synchronize with the new 
value into brokerEpoch
+assertEquals(1000L, manager.brokerEpoch)
+
+// Trigger JBOD MV update
 manager.handleKraftJBODMetadataVersionUpdate()
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-TestUtils.retry(6) {
-  context.time.sleep(100)
-  context.poll()
-  manager.eventQueue.wakeup()
-  assertEquals(1200, manager.brokerEpoch)
-}
+
+// We may have to accept some heartbeats before the new registration is 
sent
+while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()

Review Comment:
   `prepareResponse` knows the request type. Could we generate the response 
corresponding to the request type there?



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  // If the manager is idling until scheduled events we need to advance 
the clock
+  if (manager.eventQueue.scheduledAfterIdling()
+.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid 
triggering timeout events

Review Comment:
   I see. Perhaps we could add a step after `poll(ctx, manager, registration)` 
to explicitly drain `RegistrationTimeoutEvent` first? This makes the test safer 
since it won't be sensitive to the timeout config values.



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest {
   Collections.emptyMap(), OptionalLong.empty())
 poll(ctx, manager, registration)
 
+def nextHeartbeatDirs(): Set[String] =
+  poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
+.data().offlineLogDirs().asScala.map(_.toString).toSet
+assertEquals(Set.empty, nextHeartbeatDirs())
 
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
 
manager.propag

Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-19 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-2006292277

   Thanks for the review @junrao. Please take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-19 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1529927411


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  // If the manager is idling until scheduled events we need to advance 
the clock
+  if (manager.eventQueue.scheduledAfterIdling()
+.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid 
triggering timeout events

Review Comment:
   This is a bit ugly. It only applies to `RegistrationTimeoutEvent`. We want 
to avoid triggering those deferred events.
   
   Instead of checking for that class specifically, I thought there may be 
other timeout event types in the future and we'll also want to avoid those. I 
also ruled out extending EventQueue.Event as we only care about this in this 
test suite.
   
   I feel ambivalent about the approach I've taken here, so please let me know 
if you have a preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-19 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1529919190


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest {
   Collections.emptyMap(), OptionalLong.empty())
 poll(ctx, manager, registration)
 
+def nextHeartbeatDirs(): Set[String] =
+  poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
+.data().offlineLogDirs().asScala.map(_.toString).toSet
+assertEquals(Set.empty, nextHeartbeatDirs())
 
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
 
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())

Review Comment:
   Thanks for carefully thinking this through. 
   
   I believe there's a mistake in that sequence of events though:
   
   > nextHeartbeatDirs() causes the time to advance quickly and passes 100
   
   This isn't how `BrokerLifecycleManagerTest#poll()` works. It will only 
advance time if there is no event in the queue that can immediately run, so o1 
will always run before c2.
   
   > o1 is processed at manager.eventQueue and a CommunicationEvent c3 is 
appended to manager.eventQueue
   
   Because o1 runs before c2, the EventQueue will override c2 with c3 as 
they're both deferred events with the same tag.
   
   After running this test tens of thousands of iterations, I was unable to 
reproduce a flaky run. 
   
   



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(controllerNode)
+
+manager.start(() => ctx.highestMetadataOffset.get(),
+  ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.of(10L))
-TestUtils.retry(6) {
-  assertEquals(1, context.mockChannelManager.unsentQueue.size)
-  assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-}
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-TestUtils.retry(1) {
-  context.poll()
-  assertEquals(1000L, manager.brokerEpoch)
-}
 
+def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+def nextRegistrationRequest(epoch: Long) =
+  doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+// Broker registers and response sets epoch to 1000L
+assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+nextRequest() // poll for next request as way to synchronize with the new 
value into brokerEpoch
+assertEquals(1000L, manager.brokerEpoch)
+
+// Trigger JBOD MV update
 manager.handleKraftJBODMetadataVersionUpdate()
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-TestUtils.retry(6) {
-  context.time.sleep(100)
-  context.poll()
-  manager.eventQueue.wakeup()
-  assertEquals(1200, manager.brokerEpoch)
-}
+
+// We may have to accept some heartbeats before the new registration is 
sent
+while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()

Review Comment:
   I found it difficult to predict when the new `BrokerRegistration` request is 
sent. Depending on scheduling it could be sent straight away or it could get 
queued behind another heartbeat.
   
   It is indeed incorrect to mock 

Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-18 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1526788841


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -551,9 +580,11 @@ class BrokerLifecycleManager(
   }
 
   private def scheduleNextCommunication(intervalNs: Long): Unit = {
-trace(s"Scheduling next communication at 
${MILLISECONDS.convert(intervalNs, NANOSECONDS)} " +
+val nanos = if (nextSchedulingShouldBeImmediate) 0 else intervalNs

Review Comment:
   nanos =>  adjustedIntervalNs?



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest {
   Collections.emptyMap(), OptionalLong.empty())
 poll(ctx, manager, registration)
 
+def nextHeartbeatDirs(): Set[String] =
+  poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
+.data().offlineLogDirs().asScala.map(_.toString).toSet
+assertEquals(Set.empty, nextHeartbeatDirs())
 
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
 
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())

Review Comment:
   It seems this could still be flaky. 
   
   `poll(ctx, manager, registration)
   `
   A CommunicationEvent c1 with a delay of 0 is scheduled in manager.eventQueue.
   
   `assertEquals(Set.empty, nextHeartbeatDirs())
   `
   c1 is processed in `ctx.mockChannelManager` and a 
BrokerHeartbeatResponseEvent b1 is added to `manager.eventQueue`. b1 is 
processed at `manager.eventQueue` and a CommunicationEvent c2 with a delay of 
100 is scheduled in `manager.eventQueue`.
   
   `manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
   `
   An OfflineDirEvent o1 is appended to `manager.eventQueue`, but not yet 
processed.
   
   `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
   `
   nextHeartbeatDirs() causes the time to advance quickly and passes 100. c2 is 
processed at `manager.eventQueue` and a HeartBeat request h1 is added to 
`ctx.mockChannelManager`.
   
   o1 is processed at `manager.eventQueue` and a CommunicationEvent c3 is 
appended to `manager.eventQueue`.  c3 is processed at `manager.eventQueue` sets 
`nextSchedulingShouldBeImmediate` to true.
   
   h1 is processed by `ctx.mockChannelManager` and adds 
BrokerHeartbeatResponseEvent b2 to `manager.eventQueue`. `manager.eventQueue` 
processes b2 and schedules a CommunicationEvent c4 with a delay of 0 in 
`manager.eventQueue`. c4 is processed at `manager.eventQueue` and a HeartBeat 
request h2 (doesn't pick up ej8Q9_d2Ri6FXNiTxKFiow) is added to 
`ctx.mockChannelManager`.
   
   
`manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))`
   `assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow"), 
nextHeartbeatDirs())`
   Now the above assertion will fail since `nextHeartbeatDirs()` will pick up 
c4 which doesn't include ej8Q9_d2Ri6FXNiTxKFiow.
   
   
   
   



##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +514,36 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Returns the deferred event that the queue is waiting for, idling until
+ * its deadline comes, if there is any.
+ * If the queue has immediate work to do, this returns empty.
+ * This is useful for unit tests, where to make progress, we need to
+ * speed the clock up until the next scheduled event is ready to run.
+ */
+public Optional scheduledAfterIdling() {

Review Comment:
   scheduledAfterIdling => firstDeferredIfIdling ?



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(control

Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-14 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1998592089

   Rebased this due to merge conflict. 
   
   @gaurav-narula I also made what I believe are some improvement to 
`BrokerLifecycleManagerTest.testKraftJBODMetadataVersionUpdateEvent`, can you 
please take a look?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-10 Thread via GitHub


github-actions[bot] commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1987577154

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-11 Thread via GitHub


rondagostino commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1850988044

   `12 tests have failed.  There are 12 new tests failing, 0 existing failing`
   
   I think this PR can be merged now assuming @cmccabe takes a look at the 
question above regarding prepend to event queue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-11 Thread via GitHub


OmniaGM commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1850387349

   > `51 tests have failed. There are 49 new tests failing, 2 existing failing`
   > 
   > This seems like a lot more failures than we are used to recently. @OmniaGM 
Can you rebase this onto latest `trunk` to see if we can get a cleaner build?
   
   done! I don't have related tests failing locally. Waiting for the build now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-11 Thread via GitHub


rondagostino commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1850245547

   `51 tests have failed.  There are 49 new tests failing, 2 existing failing`
   
   This seems like a lot more failures than we are used to recently.  @OmniaGM 
Can you rebase this onto latest `trunk` to see if we can get a cleaner build?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1843813913

   @junrao: Thanks for revewing this. It seems there are currently some issues 
with faling tests. See 
https://github.com/apache/kafka/pull/14838#issuecomment-1843693525 
   The test failures for this PR don't seem to be related to the changes, but 
to be sure we can also wait a bit longer and see if the state of the build 
improves. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1417911671


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -364,10 +377,30 @@ class BrokerLifecycleManager(
 }
 _channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
   new BrokerRegistrationResponseHandler())
+communicationInFlight = true
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))

Review Comment:
   @cmccabe : Do you have concerns on the usage of `prepend` here?



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.
+   * This variable can only be read or written from the event queue thread.
+   */
+  private var communicationInFlight = false

Review Comment:
   Yes, I agree that it's not straightforward. We can just leave the code as it 
is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1417143188


##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Useful for unit tests, where we need to speed the clock up until
+ * deferred events are ready to run.
+ */
+public Object pendingDeferredEvent() {
+lock.lock();
+try {
+if (eventHandler.head.next != eventHandler.head) {
+return null;
+}
+Map.Entry entry = 
eventHandler.deadlineMap.firstEntry();
+if (entry == null) {
+return null;
+}
+EventContext eventContext = entry.getValue();
+if (eventContext.insertionType == EventInsertionType.DEFERRED) {

Review Comment:
   That is incorrect. Events of any type can end up in `deadlineMap` as long as 
they have a deadline specified.
   
   The notion of deadline here is a bit confusing. A deadline for events of 
type `DEFERRED` is the time after which the event should run. For events of 
other types, the deadline is the time after which `TimeoutException` should be 
delivered if the event hasn't run yet. So we want to ignore events that are not 
of type `DEFERRED`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1417129195


##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Useful for unit tests, where we need to speed the clock up until
+ * deferred events are ready to run.
+ */
+public Object pendingDeferredEvent() {
+lock.lock();
+try {
+if (eventHandler.head.next != eventHandler.head) {

Review Comment:
   Well, yes, deferred events are not stored under `head`, but if there are any 
events under `head` we should not fast forward the clock, as those will run 
regardless. The point here is to return the first event that will run next, 
when its deadline comes, but if and only if there's nothing else to do until 
then. I'll get a better name and javadoc for this method so this makes more 
sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1417124246


##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Useful for unit tests, where we need to speed the clock up until
+ * deferred events are ready to run.
+ */
+public Object pendingDeferredEvent() {

Review Comment:
   Sure 👍 



##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Useful for unit tests, where we need to speed the clock up until
+ * deferred events are ready to run.
+ */
+public Object pendingDeferredEvent() {

Review Comment:
   Sure 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1417121867


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  if 
(Option(manager.eventQueue.pendingDeferredEvent()).exists(!_.getClass.getSimpleName.endsWith("TimeoutEvent")))
+ctx.time.sleep(5)
   manager.eventQueue.wakeup()
-  context.time.sleep(5)
+}
+while (!future.isDone || ctx.mockClient.hasInFlightRequests) {

Review Comment:
   Yes, it seems that is enough. Changing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-06 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1417116727


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.
+   * This variable can only be read or written from the event queue thread.
+   */
+  private var communicationInFlight = false

Review Comment:
   I don't see a straightforward way to do this.
   
   Maybe I'm missing something, but`NetworkClient` does not look like a good 
place for this. `kafka.server.NodeToControllerChannelManagerImpl#sendRequest` 
isn't synchronous, it runs `requestThread.enqueue`, so if we rely on 
`networkClient` state we open the door to race conditions.
   Also, `NodeToControllerRequestThread` can reset the network client at any 
point.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-05 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1416090404


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.
+   * This variable can only be read or written from the event queue thread.
+   */
+  private var communicationInFlight = false

Review Comment:
   The `NetworkClient` in `_channelManager` already maintains the state for in 
flight requests through `networkClient.ready(node, now)`. So, I am wondering if 
we need to maintain the state here. Could we just expose the state in 
`NetworkClient` through `_channelManager`?



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  if 
(Option(manager.eventQueue.pendingDeferredEvent()).exists(!_.getClass.getSimpleName.endsWith("TimeoutEvent")))
+ctx.time.sleep(5)
   manager.eventQueue.wakeup()
-  context.time.sleep(5)
+}
+while (!future.isDone || ctx.mockClient.hasInFlightRequests) {

Review Comment:
   Do we need `ctx.mockClient.hasInFlightRequests`? It seems that 
`!future.isDone` is enough?



##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Useful for unit tests, where we need to speed the clock up until
+ * deferred events are ready to run.
+ */
+public Object pendingDeferredEvent() {

Review Comment:
   Could we return Optional instead of null?



##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Useful for unit tests, where we need to speed the clock up until
+ * deferred events are ready to run.
+ */
+public Object pendingDeferredEvent() {
+lock.lock();
+try {
+if (eventHandler.head.next != eventHandler.head) {
+return null;
+}
+Map.Entry entry = 
eventHandler.deadlineMap.firstEntry();
+if (entry == null) {
+return null;
+}
+EventContext eventContext = entry.getValue();
+if (eventContext.insertionType == EventInsertionType.DEFERRED) {

Review Comment:
   Do we need this check? It seems only `DEFERRED` events are stored in 
`deadlineMap`.



##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -513,4 +513,28 @@ public void close() throws InterruptedException {
 eventHandlerThread.join();
 log.info("closed event queue.");
 }
+
+/**
+ * Useful for unit tests, where we need to speed the clock up until
+ * deferred events are ready to run.
+ */
+public Object pendingDeferredEvent() {
+lock.lock();
+try {
+if (eventHandler.head.next != eventHandler.head) {

Review Comment:
   Deferred events are not stored under `head`. It seems there is not need to 
check `head`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1415408700


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.

Review Comment:
   We do. The very first Registration request is also scheduled via a 
`CommunicationEvent`. 
   `communicationInFlight = true` is now set after both 
`_channelManager.sendRequest(new BrokerRegistrationRequest...` and 
`_channelManager.sendRequest(new BrokerHeartbeatRequest...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839871090

   @cmccabe thanks for having a look. 
   
   I've made the following changes:
   
   * Replaced the use of `prepend` with `append` in `propagateDirectoryFailure`
   * Moved `communicationInFlight = true` after `_channelManager.sendRequest()`
   * Always set `nextSchedulingShouldBeImmediate = false` in 
`scheduleNextCommunicationAfterFailure`
   * Moved checking if `communicationInFlight = true` to 
`CommunicationEvent.run()`
   
   @junrao @cmccabe: please take another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414728861


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  if (manager.eventQueue.isEmpty)

Review Comment:
   You're right, this is incorrect. We must only advance the time if the 
eventQueue has an event scheduled at a future time, unless the next event is a 
timeout event. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414711941


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   Ok. I wasn't sure that communicationInFlight is for both requests.
   
   Since `NodeToControllerChannelManagerImpl` uses 1 for 
`maxInFlightRequestsPerConnection`, another option is to add a `canSendRequest` 
api in `NodeToControllerChannelManager`, instead maintaining 
`communicationInFlight` here. 
   
   `canSendRequest` api can be implemented through `networkClient.ready(node, 
now)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414708239


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   Since `NodeToControllerChannelManagerImpl` uses 1 for 
`maxInFlightRequestsPerConnection`, we could also add a `sendRequest` api in 
`NodeToControllerChannelManager`, instead 
   
   networkClient.ready(node, now)



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   Since `NodeToControllerChannelManagerImpl` uses 1 for 
`maxInFlightRequestsPerConnection`, we could also add a `sendRequest` api in 
`NodeToControllerChannelManager`, instead 
   
   networkClient.ready(node, now)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414704903


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.

Review Comment:
   Hmm, if that's the case, should we set communicationInFlight to true when 
sending the very first Registration request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839768073

   > Instead, CommunicationEvent.run() should gracefully handle running when 
communicationInFlight = true Probably by doing nothing.
   
   This also implies that `scheduleNextCommunicationAfterSuccess` should be 
checking `nextSchedulingShouldBeImmediate` (not sure why it doesn't already)
   
   Probably `nextSchedulingShouldBeImmediate` should be renamed to something 
like "dirty" since it doesn't actually mean the next scheduling should always 
be immediate. Like if there has been a communication error 
(`scheduleNextCommunicationAfterFailure`) we do not want to immediately 
reschedule, no matter what.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


cmccabe commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1839747217

   Thanks for looking at this, @soarez .
   
   I don't think `nextSchedulingShouldBeImmediate` should be reset to false 
until the `CommunicationEvent` is run.
   
   I also think we should be a bit more careful about setting 
`communicationInFlight` ... it should only be set after 
`_channelManager.sendRequest` has been called. And even that call should 
probably be wrapped in a `try...catch`. If `sendRequest` throws, we don't want 
to be stuck never sending another request.
   
   (I don't think sendRequest is supposed to throw, but we should be careful.)
   
   I don't think prepend is needed anywhere. To be honest, the presence of 
`prepend` usually indicates a bug... :) Instead, `CommunicationEvent.run()` 
should gracefully handle running when `communicationInFlight = true` Probably 
by doing nothing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414647339


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.

Review Comment:
   It applies to both Heartbeat and Registration requests which are the only 
requests this manager sends.
   
   I think we're also fixing a bug were a second registration request could 
have been scheduled after a call to `propagateDirectoryFailure` and before the 
registration response is received.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414633539


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -453,79 +490,73 @@ class BrokerLifecycleManager(
 val message = 
response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
 val errorCode = Errors.forCode(message.data().errorCode())
 if (errorCode == Errors.NONE) {
-  // this response handler is not invoked from the event handler 
thread,
-  // and processing a successful heartbeat response requires updating
-  // state, so to continue we need to schedule an event
-  eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data()))
+  val responseData = message.data()
+  failedAttempts = 0
+  _state match {
+case BrokerState.STARTING =>
+  if (responseData.isCaughtUp) {
+info(s"The broker has caught up. Transitioning from STARTING 
to RECOVERY.")
+_state = BrokerState.RECOVERY
+initialCatchUpFuture.complete(null)
+  } else {
+debug(s"The broker is STARTING. Still waiting to catch up with 
cluster metadata.")
+  }
+  // Schedule the heartbeat after only 10 ms so that in the case 
where
+  // there is no recovery work to be done, we start up a bit 
quicker.
+  scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
+case BrokerState.RECOVERY =>
+  if (!responseData.isFenced) {
+info(s"The broker has been unfenced. Transitioning from 
RECOVERY to RUNNING.")
+initialUnfenceFuture.complete(null)
+_state = BrokerState.RUNNING
+  } else {
+info(s"The broker is in RECOVERY.")
+  }
+  scheduleNextCommunicationAfterSuccess()
+case BrokerState.RUNNING =>
+  debug(s"The broker is RUNNING. Processing heartbeat response.")
+  scheduleNextCommunicationAfterSuccess()
+case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
+  if (!responseData.shouldShutDown()) {
+info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, 
still waiting " +
+  "for the active controller.")
+if (!gotControlledShutdownResponse) {
+  // If this is the first pending controlled shutdown response 
we got,
+  // schedule our next heartbeat a little bit sooner than we 
usually would.
+  // In the case where controlled shutdown completes quickly, 
this will
+  // speed things up a little bit.
+  scheduleNextCommunication(NANOSECONDS.convert(50, 
MILLISECONDS))
+} else {
+  scheduleNextCommunicationAfterSuccess()
+}
+  } else {
+info(s"The controller has asked us to exit controlled 
shutdown.")
+beginShutdown()
+  }
+  gotControlledShutdownResponse = true
+case BrokerState.SHUTTING_DOWN =>
+  info(s"The broker is SHUTTING_DOWN. Ignoring heartbeat 
response.")
+case _ =>
+  error(s"Unexpected broker state ${_state}")
+  scheduleNextCommunicationAfterSuccess()
+  }
 } else {
   warn(s"Broker $nodeId sent a heartbeat request but received error 
$errorCode.")
   scheduleNextCommunicationAfterFailure()
 }
   }
 }
-
-override def onTimeout(): Unit = {
-  info("Unable to send a heartbeat because the RPC got timed out before it 
could be sent.")
-  scheduleNextCommunicationAfterFailure()
-}
   }
 
-  private class BrokerHeartbeatResponseEvent(response: 
BrokerHeartbeatResponseData) extends EventQueue.Event {
-override def run(): Unit = {
-  failedAttempts = 0
-  _state match {
-case BrokerState.STARTING =>
-  if (response.isCaughtUp) {
-info(s"The broker has caught up. Transitioning from STARTING to 
RECOVERY.")
-_state = BrokerState.RECOVERY
-initialCatchUpFuture.complete(null)
-  } else {
-debug(s"The broker is STARTING. Still waiting to catch up with 
cluster metadata.")
-  }
-  // Schedule the heartbeat after only 10 ms so that in the case where
-  // there is no recovery work to be done, we start up a bit quicker.
-  scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
-case BrokerState.RECOVERY =>
-  if (!response.isFenced) {
-info(s"The broker has been unfenced. Transitioning from RECOVERY 
to RUNNING.")
-initialUnfenceFuture.complete(null)
-_state = BrokerState.RUNNING
-  } else {
-info(s"The broker is in RECOVERY.")
-  }

Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


soarez commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414626881


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   I don't think so. We set `communicationInFlight = true` in 
`CommunicationEvent.run`, which sends both heartbeats but also registration 
requests.
   
   Maybe I'm missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-04 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1414322506


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,14 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  if (manager.eventQueue.isEmpty)

Review Comment:
   Why do we need this check? If the eventQueue has a event scheduled event at 
a future time, we need to advance the time to drain the event, right?



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -166,6 +166,19 @@ class BrokerLifecycleManager(
*/
   private var registered = false
 
+  /**
+   * True if a request has been sent and a response or timeout has not yet 
been processed.

Review Comment:
   a request => a Heartbeat request ?



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -366,8 +379,27 @@ class BrokerLifecycleManager(
   new BrokerRegistrationResponseHandler())
   }
 
+  // the response handler is not invoked from the event handler thread,
+  // so it is not safe to update state here, instead, schedule an event
+  // to continue handling the response on the event handler thread
   private class BrokerRegistrationResponseHandler extends 
ControllerRequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(response, false))
+}
+
+override def onTimeout(): Unit = {
+  info(s"Unable to register the broker because the RPC got timed out 
before it could be sent.")
+  eventQueue.prepend(new BrokerRegistrationResponseEvent(null, true))
+}
+  }
+
+  private class BrokerRegistrationResponseEvent(response: ClientResponse, 
timedOut: Boolean) extends EventQueue.Event {
+override def run(): Unit = {
+  communicationInFlight = false

Review Comment:
   It seems that we don't need this since `communicationInFlight` is only set 
to true when sending a `HeartbeatRrequest`?



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -453,79 +490,73 @@ class BrokerLifecycleManager(
 val message = 
response.responseBody().asInstanceOf[BrokerHeartbeatResponse]
 val errorCode = Errors.forCode(message.data().errorCode())
 if (errorCode == Errors.NONE) {
-  // this response handler is not invoked from the event handler 
thread,
-  // and processing a successful heartbeat response requires updating
-  // state, so to continue we need to schedule an event
-  eventQueue.prepend(new BrokerHeartbeatResponseEvent(message.data()))
+  val responseData = message.data()
+  failedAttempts = 0
+  _state match {
+case BrokerState.STARTING =>
+  if (responseData.isCaughtUp) {
+info(s"The broker has caught up. Transitioning from STARTING 
to RECOVERY.")
+_state = BrokerState.RECOVERY
+initialCatchUpFuture.complete(null)
+  } else {
+debug(s"The broker is STARTING. Still waiting to catch up with 
cluster metadata.")
+  }
+  // Schedule the heartbeat after only 10 ms so that in the case 
where
+  // there is no recovery work to be done, we start up a bit 
quicker.
+  scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))
+case BrokerState.RECOVERY =>
+  if (!responseData.isFenced) {
+info(s"The broker has been unfenced. Transitioning from 
RECOVERY to RUNNING.")
+initialUnfenceFuture.complete(null)
+_state = BrokerState.RUNNING
+  } else {
+info(s"The broker is in RECOVERY.")
+  }
+  scheduleNextCommunicationAfterSuccess()
+case BrokerState.RUNNING =>
+  debug(s"The broker is RUNNING. Processing heartbeat response.")
+  scheduleNextCommunicationAfterSuccess()
+case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
+  if (!responseData.shouldShutDown()) {
+info(s"The broker is in PENDING_CONTROLLED_SHUTDOWN state, 
still waiting " +
+  "for the active controller.")
+if (!gotControlledShutdownResponse) {
+  // If this is the first pending controlled shutdown response 
we got,
+  // schedule our next heartbeat a little bit sooner than we 
usually would.
+  // In the case where controlled shutdown comp

Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2023-12-03 Thread via GitHub


soarez commented on PR #14903:
URL: https://github.com/apache/kafka/pull/14903#issuecomment-1837447759

   @junrao: please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org