This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 26ffdd7728 MINOR: Fix unexpected request error in kraft shutdown (#12538) 26ffdd7728 is described below commit 26ffdd772827627af2e3213a395bae56c1b41321 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Fri Aug 19 12:45:05 2022 -0700 MINOR: Fix unexpected request error in kraft shutdown (#12538) We have been seeing a few exceptions like the following when running integration tests: ``` [2022-08-18 13:02:59,470] ERROR [ControllerApis nodeId=3000] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7) -- FetchRequestData(clusterId='txpo87ZUSbGSeV2v7H0n_w', replicaId=0, maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='__cluster_metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=1, fetchOf [...] java.util.concurrent.CompletionException: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) at org.apache.kafka.raft.KafkaRaftClient.lambda$handleRequest$19(KafkaRaftClient.java:1666) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) at kafka.raft.TimingWheelExpirationService$TimerTaskCompletableFuture.run(TimingWheelExpirationService.scala:32) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE ``` There are two causes for this error that I found. First, we were not shutting down the timer services in `RaftManager` which are used in the purgatory implementation. This meant that operations remaining in purgatory could be completed even after `RaftManager` was shutdown. Second, the shutdown order in `KafkaClusterTestKit` was problematic. The `RaftManager` instance depends on the `SocketServer` in `ControllerServer`, but it was the latter that was shutdown first. Instead, we should [...] Reviewers: Ismael Juma <ism...@juma.me.uk> --- core/src/main/scala/kafka/raft/RaftManager.scala | 10 ++++--- .../main/scala/kafka/server/ControllerApis.scala | 1 + .../main/scala/kafka/server/KafkaRaftServer.scala | 7 +++++ .../java/kafka/testkit/KafkaClusterTestKit.java | 35 +++++++++++++--------- 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index a44d9d8fe0..5b8fe1e827 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -122,6 +122,8 @@ class KafkaRaftManager[T]( private val dataDir = createDataDir() override val replicatedLog: ReplicatedLog = buildMetadataLog() private val netChannel = buildNetworkChannel() + private val expirationTimer = new SystemTimer("raft-expiration-executor") + private val expirationService = new TimingWheelExpirationService(expirationTimer) override val client: KafkaRaftClient[T] = buildRaftClient() private val raftIoThread = new RaftIoThread(client, threadNamePrefix) @@ -133,10 +135,10 @@ class KafkaRaftManager[T]( case spec: InetAddressSpec => netChannel.updateEndpoint(voterAddressEntry.getKey, spec) case _: UnknownAddressSpec => - logger.info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " + + info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " + s"because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}") case invalid: AddressSpec => - logger.warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " + + warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " + s"destination ID: ${voterAddressEntry.getKey}") } } @@ -145,6 +147,8 @@ class KafkaRaftManager[T]( } def shutdown(): Unit = { + expirationService.shutdown() + expirationTimer.shutdown() raftIoThread.shutdown() client.close() scheduler.shutdown() @@ -177,8 +181,6 @@ class KafkaRaftManager[T]( } private def buildRaftClient(): KafkaRaftClient[T] = { - val expirationTimer = new SystemTimer("raft-expiration-executor") - val expirationService = new TimingWheelExpirationService(expirationTimer) val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state")) val nodeId = OptionalInt.of(config.nodeId) diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 511d4b333c..235660cec0 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -68,6 +68,7 @@ class ControllerApis(val requestChannel: RequestChannel, val controllerNodes: Seq[Node], val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging { + this.logIdent = s"[ControllerApis nodeId=${config.nodeId}] " val authHelper = new AuthHelper(authorizer) val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time) private val aclApis = new AclApis(authHelper, authorizer, requestHelper, "controller", config) diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 2338ef5e7c..8ce0bc1861 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -56,6 +56,7 @@ class KafkaRaftServer( threadNamePrefix: Option[String] ) extends Server with Logging { + this.logIdent = s"[KafkaRaftServer nodeId=${config.nodeId}] " KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals)) KafkaYammerMetrics.INSTANCE.configure(config.originals) @@ -133,6 +134,8 @@ class KafkaRaftServer( override def startup(): Unit = { Mx4jLoader.maybeLoad() + // Note that we startup `RaftManager` first so that the controller and broker + // can register listeners during initialization. raftManager.startup() controller.foreach(_.startup()) broker.foreach(_.startup()) @@ -142,6 +145,10 @@ class KafkaRaftServer( override def shutdown(): Unit = { broker.foreach(_.shutdown()) + // The order of shutdown for `RaftManager` and `ControllerServer` is backwards + // compared to `startup()`. This is because the `SocketServer` implementation that + // we rely on to receive requests is owned by `ControllerServer`, so we need it + // to stick around until graceful shutdown of `RaftManager` can be completed. raftManager.shutdown() controller.foreach(_.shutdown()) CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this) diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index ecee13c498..139b05fa54 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -275,15 +275,15 @@ public class KafkaClusterTestKit implements AutoCloseable { executorService.shutdownNow(); executorService.awaitTermination(5, TimeUnit.MINUTES); } - for (ControllerServer controller : controllers.values()) { - controller.shutdown(); - } for (BrokerServer brokerServer : brokers.values()) { brokerServer.shutdown(); } for (KafkaRaftManager<ApiMessageAndVersion> raftManager : raftManagers.values()) { raftManager.shutdown(); } + for (ControllerServer controller : controllers.values()) { + controller.shutdown(); + } connectFutureManager.close(); if (baseDirectory != null) { Utils.delete(baseDirectory); @@ -408,12 +408,15 @@ public class KafkaClusterTestKit implements AutoCloseable { public void startup() throws ExecutionException, InterruptedException { List<Future<?>> futures = new ArrayList<>(); try { - for (ControllerServer controller : controllers.values()) { - futures.add(executorService.submit(controller::startup)); - } + // Note the startup order here is chosen to be consistent with + // `KafkaRaftServer`. See comments in that class for an explanation. + for (KafkaRaftManager<ApiMessageAndVersion> raftManager : raftManagers.values()) { futures.add(controllerQuorumVotersFutureManager.future.thenRunAsync(raftManager::startup)); } + for (ControllerServer controller : controllers.values()) { + futures.add(executorService.submit(controller::startup)); + } for (BrokerServer broker : brokers.values()) { futures.add(executorService.submit(broker::startup)); } @@ -513,6 +516,10 @@ public class KafkaClusterTestKit implements AutoCloseable { List<Entry<String, Future<?>>> futureEntries = new ArrayList<>(); try { controllerQuorumVotersFutureManager.close(); + + // Note the shutdown order here is chosen to be consistent with + // `KafkaRaftServer`. See comments in that class for an explanation. + for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) { int brokerId = entry.getKey(); BrokerServer broker = entry.getValue(); @@ -521,14 +528,6 @@ public class KafkaClusterTestKit implements AutoCloseable { } waitForAllFutures(futureEntries); futureEntries.clear(); - for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) { - int controllerId = entry.getKey(); - ControllerServer controller = entry.getValue(); - futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId, - executorService.submit(controller::shutdown))); - } - waitForAllFutures(futureEntries); - futureEntries.clear(); for (Entry<Integer, KafkaRaftManager<ApiMessageAndVersion>> entry : raftManagers.entrySet()) { int raftManagerId = entry.getKey(); KafkaRaftManager<ApiMessageAndVersion> raftManager = entry.getValue(); @@ -537,6 +536,14 @@ public class KafkaClusterTestKit implements AutoCloseable { } waitForAllFutures(futureEntries); futureEntries.clear(); + for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) { + int controllerId = entry.getKey(); + ControllerServer controller = entry.getValue(); + futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId, + executorService.submit(controller::shutdown))); + } + waitForAllFutures(futureEntries); + futureEntries.clear(); Utils.delete(baseDirectory); } catch (Exception e) { for (Entry<String, Future<?>> entry : futureEntries) {