[ 
https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15932:
---------------------------------

    Assignee: Andrew Schofield

> Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer")
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-15932
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15932
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.7.0
>            Reporter: Andrew Schofield
>            Assignee: Andrew Schofield
>            Priority: Major
>              Labels: flaky-test
>
> Intermittently failing test for the new consumer.
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/
> ```Error
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
> Stacktrace
> org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 
> records. The number consumed was 0.
>       at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>       at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>       at 
> app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161)
>       at 
> app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128)
>       at 
> app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616)
>       at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>       at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>       at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>       at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>       at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>       at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>       at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
>       at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>       at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>       at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>       at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>       at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>       at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>       at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>       at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>       at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
>       at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
>       at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>       at 
> app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>       at 
> app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
>       at 
> app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:142)
>       at 
> app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:110)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>       at 
> java.base@11.0.16.1/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
>       at 
> java.base@11.0.16.1/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>       at 
> java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>       at 
> java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>       at 
> java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>       at 
> app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:110)
>       at 
> app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:44)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>       at 
> app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>       at java.base@11.0.16.1/java.util.ArrayList.forEach(ArrayList.java:1541)
>       at 
> app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>       at 
> app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>       at java.base@11.0.16.1/java.util.ArrayList.forEach(ArrayList.java:1541)
>       at 
> app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>       at 
> app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>       at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>       at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>       at 
> app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>       at 
> app//org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>       at 
> app//org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
>       at 
> app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>       at 
> app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>       at 
> app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>       at 
> app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>       at 
> app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>       at 
> app//org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
>       at 
> app//org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
>       at 
> app//org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
>       at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:118)
>       at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:93)
>       at 
> org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:88)
>       at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
>       at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>       at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>       at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>       at com.sun.proxy.$Proxy2.stop(Unknown Source)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
>       at 
> org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
>       at 
> org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
>       at 
> org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
>       at 
> app//worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
>       at 
> app//worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
> Standard Output
> [2023-11-28 22:32:42,436] WARN maxCnxns is not configured, using default 
> value 0. (org.apache.zookeeper.server.ServerCnxnFactory:309)
> [2023-11-28 22:32:42,554] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition 
> __consumer_offsets-0. This error may be returned transiently when the 
> partition is being created or deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 22:32:42,554] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition 
> __consumer_offsets-0. This error may be returned transiently when the 
> partition is being created or deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 22:32:42,658] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. 
> This error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 22:32:42,659] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. 
> This error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 22:32:42,659] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. 
> This error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 22:32:44,062] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1029971141, epoch=2), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:44,062] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=258532250, epoch=2), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:44,282] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Close timed out with 1 pending requests to coordinator, 
> terminating client connections 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123)
> [2023-11-28 22:32:44,285] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=luVxKKl9Tf2TGcoaTSC0lA, 
> fetchOffset=3, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[2], lastFetchedEpoch=Optional[2])}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1140846877, epoch=4), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:44,285] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={topic-1=PartitionData(topicId=0CftS7C6T3GRKzHC-gaTFQ, 
> fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional.empty)}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1798269664, epoch=1), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/tmp/kafka-3842282715338511459: EMPTY})
> [2023-11-28 22:32:44,534] WARN [QuorumController id=1000] Performing 
> controller activation. The metadata log appears to be empty. Appending 1 
> bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from 
> bootstrap source 'test harness'. Setting the ZK migration state to NONE since 
> this is a de-novo KRaft cluster. 
> (org.apache.kafka.controller.QuorumController:108)
> [2023-11-28 22:32:45,499] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@56ac7b1a 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 22:32:45,499] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@1e56d84 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 22:32:45,504] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@403f1279 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 22:32:46,513] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=695135585, epoch=4), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:46,513] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=282103315, epoch=3), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:46,646] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:46,748] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:47,049] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:47,147] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:47,513] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:47,513] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:47,514] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:47,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:47,551] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:47,651] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:48,153] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:48,455] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:48,515] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:48,655] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:49,361] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:49,515] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:49,515] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:49,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:49,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:49,660] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:50,264] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:50,664] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:51,274] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:51,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:51,669] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:52,276] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:52,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:52,579] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:53,180] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not 
> be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:53,470] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) 
> could not be established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be 
> established. Node may not be available. 
> (org.apache.kafka.clients.NetworkClient:814)
> [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=1, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), 
> topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], 
> lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, 
> replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed.
>       at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:54,084] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Close timed out with 1 pending requests to coordinator, 
> terminating client connections 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123)
> [2023-11-28 22:32:54,087] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, 
> fetchOffset=4, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional[1])}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=359597478, epoch=18), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:32:54,416] WARN [NodeToControllerChannelManager id=1000 
> name=registration] Attempting to close NetworkClient that has already been 
> closed. (org.apache.kafka.clients.NetworkClient:667)
> [2023-11-28 22:32:54,418] ERROR [QuorumController id=1000] Cancelling 
> deferred write event maybeFenceReplicas because the event queue is now 
> closed. (org.apache.kafka.controller.QuorumController:1297)
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/tmp/kafka-6888200266425032707: EMPTY})
> [2023-11-28 22:32:54,443] WARN [QuorumController id=1000] Performing 
> controller activation. The metadata log appears to be empty. Appending 1 
> bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from 
> bootstrap source 'test harness'. Setting the ZK migration state to NONE since 
> this is a de-novo KRaft cluster. 
> (org.apache.kafka.controller.QuorumController:108)
> [2023-11-28 22:32:55,399] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@6c76155d 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 22:32:55,400] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@1937c8e 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 22:32:55,401] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@371497a 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at org.apache.kafka.queue.KafkaEventQueue$EventCont
> ...[truncated 2797622 chars]...
> java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 22:44:22,187] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] Close timed out with 1 pending requests to coordinator, 
> terminating client connections 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123)
> [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={tsomec-1=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, 
> fetchOffset=1000, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=152301008, epoch=9), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={tsomec-1=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, 
> fetchOffset=1000, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1438523841, epoch=9), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={tsomec-0=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, 
> fetchOffset=1000, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=450423079, epoch=23), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={tsomec-0=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, 
> fetchOffset=1000, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1790334620, epoch=23), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=hNV3D7sjRmCoKFr8M5cIDA, 
> fetchOffset=4, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1882472249, epoch=44), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={__consumer_offsets-0=PartitionData(topicId=hNV3D7sjRmCoKFr8M5cIDA, 
> fetchOffset=4, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=941512031, epoch=44), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 22:44:22,542] WARN [NodeToControllerChannelManager id=1000 
> name=registration] Attempting to close NetworkClient that has already been 
> closed. (org.apache.kafka.clients.NetworkClient:667)
> [2023-11-28 22:44:22,545] ERROR [QuorumController id=1000] Cancelling 
> deferred write event maybeFenceReplicas because the event queue is now 
> closed. (org.apache.kafka.controller.QuorumController:1297)
> [2023-11-28 23:19:48,189] WARN maxCnxns is not configured, using default 
> value 0. (org.apache.zookeeper.server.ServerCnxnFactory:309)
> [2023-11-28 23:19:48,536] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. 
> This error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 23:19:48,537] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. 
> This error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 23:19:48,538] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. 
> This error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 23:19:48,538] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. 
> This error may be returned transiently when the partition is being created or 
> deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread:70)
> [2023-11-28 23:19:50,559] WARN [Producer clientId=ConsumerTestProducer] 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms. Setting it to 2147483647. 
> (org.apache.kafka.clients.producer.KafkaProducer:560)
> [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=958284730, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=121456605, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1490435564, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1567850327, epoch=6), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1555119328, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=351277526, epoch=6), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/tmp/kafka-16453547074389008049: EMPTY})
> [2023-11-28 23:19:52,093] WARN [QuorumController id=1000] Performing 
> controller activation. The metadata log appears to be empty. Appending 1 
> bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from 
> bootstrap source 'test harness'. Setting the ZK migration state to NONE since 
> this is a de-novo KRaft cluster. 
> (org.apache.kafka.controller.QuorumController:108)
> [2023-11-28 23:19:53,080] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@502f739a 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:19:53,081] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@503838ca 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:19:53,081] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@1439aa21 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:19:53,825] WARN [Producer clientId=ConsumerTestProducer] 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms. Setting it to 2147483647. 
> (org.apache.kafka.clients.producer.KafkaProducer:560)
> [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=145239054, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1189011807, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1943161650, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:54,854] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=428781265, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:55,017] WARN [NodeToControllerChannelManager id=1000 
> name=registration] Attempting to close NetworkClient that has already been 
> closed. (org.apache.kafka.clients.NetworkClient:667)
> [2023-11-28 23:19:55,019] ERROR [QuorumController id=1000] Cancelling 
> deferred write event maybeFenceReplicas because the event queue is now 
> closed. (org.apache.kafka.controller.QuorumController:1297)
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/tmp/kafka-4036458965452388226: EMPTY})
> [2023-11-28 23:19:55,050] WARN [QuorumController id=1000] Performing 
> controller activation. The metadata log appears to be empty. Appending 1 
> bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from 
> bootstrap source 'test harness'. Setting the ZK migration state to NONE since 
> this is a de-novo KRaft cluster. 
> (org.apache.kafka.controller.QuorumController:108)
> [2023-11-28 23:19:56,031] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@6b05363a 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:19:56,033] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@504c619e 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:19:56,033] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@74796fb4 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:19:56,871] WARN [Producer clientId=ConsumerTestProducer] 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms. Setting it to 2147483647. 
> (org.apache.kafka.clients.producer.KafkaProducer:560)
> [2023-11-28 23:19:57,912] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=513660765, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=462250570, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=49478967, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1136363756, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1394899890, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:57,914] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=820589906, epoch=5), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:19:58,216] WARN [NodeToControllerChannelManager id=1000 
> name=registration] Attempting to close NetworkClient that has already been 
> closed. (org.apache.kafka.clients.NetworkClient:667)
> [2023-11-28 23:19:58,218] ERROR [QuorumController id=1000] Cancelling 
> deferred write event maybeFenceReplicas because the event queue is now 
> closed. (org.apache.kafka.controller.QuorumController:1297)
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/tmp/kafka-2823784908060096373: EMPTY})
> [2023-11-28 23:19:58,244] WARN [QuorumController id=1000] Performing 
> controller activation. The metadata log appears to be empty. Appending 1 
> bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from 
> bootstrap source 'test harness'. Setting the ZK migration state to NONE since 
> this is a de-novo KRaft cluster. 
> (org.apache.kafka.controller.QuorumController:108)
> [2023-11-28 23:19:59,395] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@4726f78d 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:19:59,396] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@66f23734 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:19:59,396] ERROR Unexpected error handling 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent@423b55e3 
> (org.apache.kafka.server.AssignmentsManager:117)
> java.lang.IllegalStateException: Cannot enqueue a request if the request 
> thread is not running
>       at 
> kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309)
>       at 
> kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239)
>       at 
> org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>       at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> [2023-11-28 23:20:05,547] WARN [Producer clientId=ConsumerTestProducer] 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms. Setting it to 2147483647. 
> (org.apache.kafka.clients.producer.KafkaProducer:560)
> [2023-11-28 23:20:09,432] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] The fetch buffer was already closed 
> (org.apache.kafka.clients.consumer.internals.FetchBuffer:263)
> [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=365803585, epoch=14), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=1978360363, epoch=14), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=2059879385, epoch=14), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 0 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
> isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
> metadata=(sessionId=2110384598, epoch=14), rackId=) 
> (kafka.server.ReplicaFetcherThread:72)
> java.io.IOException: Connection to 1 was disconnected before the response was 
> read
>       at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>       at 
> kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
>       at 
> kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131)
>       at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130)
>       at scala.Option.foreach(Option.scala:437)
>       at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>       at 
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
>       at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
> [2023-11-28 23:20:09,617] WARN [NodeToControllerChannelManager id=1000 
> name=registration] Attempting to close NetworkClient that has already been 
> closed. (org.apache.kafka.clients.NetworkClient:667)
> [2023-11-28 23:20:09,619] ERROR [QuorumController id=1000] Cancelling 
> deferred write event maybeFenceReplicas because the event queue is now 
> closed. (org.apache.kafka.controller.QuorumController:1297)```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to