[ https://issues.apache.org/jira/browse/KAFKA-15932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirk True reassigned KAFKA-15932: --------------------------------- Assignee: Andrew Schofield > Flaky test - PlaintextConsumerTest.testSeek("kraft+kip-848","consumer") > ----------------------------------------------------------------------- > > Key: KAFKA-15932 > URL: https://issues.apache.org/jira/browse/KAFKA-15932 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 3.7.0 > Reporter: Andrew Schofield > Assignee: Andrew Schofield > Priority: Major > Labels: flaky-test > > Intermittently failing test for the new consumer. > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14859/1/tests/ > ```Error > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > Stacktrace > org.opentest4j.AssertionFailedError: Timed out before consuming expected 1 > records. The number consumed was 0. > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > app//kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:161) > at > app//kafka.api.AbstractConsumerTest.consumeAndVerifyRecords(AbstractConsumerTest.scala:128) > at > app//kafka.api.PlaintextConsumerTest.testSeek(PlaintextConsumerTest.scala:616) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at > app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204) > at > app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:142) > at > app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:110) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base@11.0.16.1/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at > java.base@11.0.16.1/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.base@11.0.16.1/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at > java.base@11.0.16.1/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base@11.0.16.1/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at > app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:110) > at > app//org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:44) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at java.base@11.0.16.1/java.util.ArrayList.forEach(ArrayList.java:1541) > at > app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at java.base@11.0.16.1/java.util.ArrayList.forEach(ArrayList.java:1541) > at > app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at > app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) > at > app//org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > at > app//org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) > at > app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at > app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > at > app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) > at > app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) > at > app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) > at > app//org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) > at > app//org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) > at > app//org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:118) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:93) > at > org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:88) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.stop(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) > at > org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) > at > org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113) > at > org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) > at > app//worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) > at > app//worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) > Standard Output > [2023-11-28 22:32:42,436] WARN maxCnxns is not configured, using default > value 0. (org.apache.zookeeper.server.ServerCnxnFactory:309) > [2023-11-28 22:32:42,554] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition > __consumer_offsets-0. This error may be returned transiently when the > partition is being created or deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 22:32:42,554] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition > __consumer_offsets-0. This error may be returned transiently when the > partition is being created or deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 22:32:42,658] WARN [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. > This error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 22:32:42,659] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. > This error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 22:32:42,659] WARN [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. > This error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 22:32:44,062] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1029971141, epoch=2), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:44,062] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=258532250, epoch=2), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:44,282] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Close timed out with 1 pending requests to coordinator, > terminating client connections > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123) > [2023-11-28 22:32:44,285] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=luVxKKl9Tf2TGcoaTSC0lA, > fetchOffset=3, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[2], lastFetchedEpoch=Optional[2])}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1140846877, epoch=4), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:44,285] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={topic-1=PartitionData(topicId=0CftS7C6T3GRKzHC-gaTFQ, > fetchOffset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional.empty)}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1798269664, epoch=1), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/tmp/kafka-3842282715338511459: EMPTY}) > [2023-11-28 22:32:44,534] WARN [QuorumController id=1000] Performing > controller activation. The metadata log appears to be empty. Appending 1 > bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from > bootstrap source 'test harness'. Setting the ZK migration state to NONE since > this is a de-novo KRaft cluster. > (org.apache.kafka.controller.QuorumController:108) > [2023-11-28 22:32:45,499] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@56ac7b1a > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 22:32:45,499] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@1e56d84 > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 22:32:45,504] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@403f1279 > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 22:32:46,513] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=695135585, epoch=4), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:46,513] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=282103315, epoch=3), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:46,646] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:46,748] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:47,049] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:47,147] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:47,513] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:47,513] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:47,514] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:47,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:47,551] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:47,651] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:48,153] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:48,455] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:48,514] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:48,515] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:48,655] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:49,361] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:49,515] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:49,515] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:49,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:49,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:49,660] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:50,264] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:50,516] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:50,664] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:51,274] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:51,517] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:51,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:51,669] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:52,276] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.1.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:52,518] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:52,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:52,579] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.0.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:53,180] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 0 (localhost/127.0.0.1:45311) could not > be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:53,470] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Connection to node 2147483647 (localhost/127.0.1.1:45311) > could not be established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Connection to node 0 (localhost/127.0.0.1:45311) could not be > established. Node may not be available. > (org.apache.kafka.clients.NetworkClient:814) > [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=695135585, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:53,519] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=1, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0]), > topic-0=PartitionData(topicId=7A33RtBNRAGq8zZA1lJJOQ, fetchOffset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0], > lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, > replaced=, metadata=(sessionId=282103315, epoch=INITIAL), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to localhost:45311 (id: 0 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:54,084] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Close timed out with 1 pending requests to coordinator, > terminating client connections > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123) > [2023-11-28 22:32:54,087] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=QpUQmO21Qba-R-yCKvgZ-g, > fetchOffset=4, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional[1])}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=359597478, epoch=18), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:32:54,416] WARN [NodeToControllerChannelManager id=1000 > name=registration] Attempting to close NetworkClient that has already been > closed. (org.apache.kafka.clients.NetworkClient:667) > [2023-11-28 22:32:54,418] ERROR [QuorumController id=1000] Cancelling > deferred write event maybeFenceReplicas because the event queue is now > closed. (org.apache.kafka.controller.QuorumController:1297) > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/tmp/kafka-6888200266425032707: EMPTY}) > [2023-11-28 22:32:54,443] WARN [QuorumController id=1000] Performing > controller activation. The metadata log appears to be empty. Appending 1 > bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from > bootstrap source 'test harness'. Setting the ZK migration state to NONE since > this is a de-novo KRaft cluster. > (org.apache.kafka.controller.QuorumController:108) > [2023-11-28 22:32:55,399] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@6c76155d > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 22:32:55,400] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@1937c8e > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 22:32:55,401] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@371497a > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at org.apache.kafka.queue.KafkaEventQueue$EventCont > ...[truncated 2797622 chars]... > java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 22:44:22,187] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] Close timed out with 1 pending requests to coordinator, > terminating client connections > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1123) > [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={tsomec-1=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, > fetchOffset=1000, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=152301008, epoch=9), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={tsomec-1=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, > fetchOffset=1000, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1438523841, epoch=9), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={tsomec-0=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, > fetchOffset=1000, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=450423079, epoch=23), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={tsomec-0=PartitionData(topicId=dKdOos2USI2FdR8JhU-KwQ, > fetchOffset=1000, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1790334620, epoch=23), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=hNV3D7sjRmCoKFr8M5cIDA, > fetchOffset=4, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1882472249, epoch=44), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:44:22,191] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={__consumer_offsets-0=PartitionData(topicId=hNV3D7sjRmCoKFr8M5cIDA, > fetchOffset=4, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=941512031, epoch=44), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 22:44:22,542] WARN [NodeToControllerChannelManager id=1000 > name=registration] Attempting to close NetworkClient that has already been > closed. (org.apache.kafka.clients.NetworkClient:667) > [2023-11-28 22:44:22,545] ERROR [QuorumController id=1000] Cancelling > deferred write event maybeFenceReplicas because the event queue is now > closed. (org.apache.kafka.controller.QuorumController:1297) > [2023-11-28 23:19:48,189] WARN maxCnxns is not configured, using default > value 0. (org.apache.zookeeper.server.ServerCnxnFactory:309) > [2023-11-28 23:19:48,536] WARN [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. > This error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 23:19:48,537] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-0. > This error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 23:19:48,538] WARN [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. > This error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 23:19:48,538] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition topic-1. > This error may be returned transiently when the partition is being created or > deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread:70) > [2023-11-28 23:19:50,559] WARN [Producer clientId=ConsumerTestProducer] > delivery.timeout.ms should be equal to or larger than linger.ms + > request.timeout.ms. Setting it to 2147483647. > (org.apache.kafka.clients.producer.KafkaProducer:560) > [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=958284730, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=121456605, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:51,807] WARN [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1490435564, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1567850327, epoch=6), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1555119328, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:51,808] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=351277526, epoch=6), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/tmp/kafka-16453547074389008049: EMPTY}) > [2023-11-28 23:19:52,093] WARN [QuorumController id=1000] Performing > controller activation. The metadata log appears to be empty. Appending 1 > bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from > bootstrap source 'test harness'. Setting the ZK migration state to NONE since > this is a de-novo KRaft cluster. > (org.apache.kafka.controller.QuorumController:108) > [2023-11-28 23:19:53,080] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@502f739a > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:19:53,081] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@503838ca > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:19:53,081] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@1439aa21 > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:19:53,825] WARN [Producer clientId=ConsumerTestProducer] > delivery.timeout.ms should be equal to or larger than linger.ms + > request.timeout.ms. Setting it to 2147483647. > (org.apache.kafka.clients.producer.KafkaProducer:560) > [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=145239054, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1189011807, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:54,853] WARN [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1943161650, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:54,854] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=428781265, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:55,017] WARN [NodeToControllerChannelManager id=1000 > name=registration] Attempting to close NetworkClient that has already been > closed. (org.apache.kafka.clients.NetworkClient:667) > [2023-11-28 23:19:55,019] ERROR [QuorumController id=1000] Cancelling > deferred write event maybeFenceReplicas because the event queue is now > closed. (org.apache.kafka.controller.QuorumController:1297) > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/tmp/kafka-4036458965452388226: EMPTY}) > [2023-11-28 23:19:55,050] WARN [QuorumController id=1000] Performing > controller activation. The metadata log appears to be empty. Appending 1 > bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from > bootstrap source 'test harness'. Setting the ZK migration state to NONE since > this is a de-novo KRaft cluster. > (org.apache.kafka.controller.QuorumController:108) > [2023-11-28 23:19:56,031] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@6b05363a > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:19:56,033] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@504c619e > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:19:56,033] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@74796fb4 > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:19:56,871] WARN [Producer clientId=ConsumerTestProducer] > delivery.timeout.ms should be equal to or larger than linger.ms + > request.timeout.ms. Setting it to 2147483647. > (org.apache.kafka.clients.producer.KafkaProducer:560) > [2023-11-28 23:19:57,912] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=513660765, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=462250570, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=49478967, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1136363756, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:57,913] WARN [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1394899890, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:57,914] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=820589906, epoch=5), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:19:58,216] WARN [NodeToControllerChannelManager id=1000 > name=registration] Attempting to close NetworkClient that has already been > closed. (org.apache.kafka.clients.NetworkClient:667) > [2023-11-28 23:19:58,218] ERROR [QuorumController id=1000] Cancelling > deferred write event maybeFenceReplicas because the event queue is now > closed. (org.apache.kafka.controller.QuorumController:1297) > metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, > dirs={/tmp/kafka-2823784908060096373: EMPTY}) > [2023-11-28 23:19:58,244] WARN [QuorumController id=1000] Performing > controller activation. The metadata log appears to be empty. Appending 1 > bootstrap record(s) in metadata transaction at metadata.version 3.7-IV1 from > bootstrap source 'test harness'. Setting the ZK migration state to NONE since > this is a de-novo KRaft cluster. > (org.apache.kafka.controller.QuorumController:108) > [2023-11-28 23:19:59,395] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@4726f78d > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:19:59,396] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@66f23734 > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:19:59,396] ERROR Unexpected error handling > org.apache.kafka.server.AssignmentsManager$DispatchEvent@423b55e3 > (org.apache.kafka.server.AssignmentsManager:117) > java.lang.IllegalStateException: Cannot enqueue a request if the request > thread is not running > at > kafka.server.NodeToControllerRequestThread.enqueue(NodeToControllerChannelManager.scala:309) > at > kafka.server.NodeToControllerChannelManagerImpl.sendRequest(NodeToControllerChannelManager.scala:239) > at > org.apache.kafka.server.AssignmentsManager$DispatchEvent.run(AssignmentsManager.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829) > [2023-11-28 23:20:05,547] WARN [Producer clientId=ConsumerTestProducer] > delivery.timeout.ms should be equal to or larger than linger.ms + > request.timeout.ms. Setting it to 2147483647. > (org.apache.kafka.clients.producer.KafkaProducer:560) > [2023-11-28 23:20:09,432] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] The fetch buffer was already closed > (org.apache.kafka.clients.consumer.internals.FetchBuffer:263) > [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=365803585, epoch=14), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=1978360363, epoch=14), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=2059879385, epoch=14), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 0 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:20:09,434] WARN [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, > isolationLevel=READ_UNCOMMITTED, removed=, replaced=, > metadata=(sessionId=2110384598, epoch=14), rackId=) > (kafka.server.ReplicaFetcherThread:72) > java.io.IOException: Connection to 1 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100) > at > kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) > at > kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:317) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:131) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:130) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at > kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) > [2023-11-28 23:20:09,617] WARN [NodeToControllerChannelManager id=1000 > name=registration] Attempting to close NetworkClient that has already been > closed. (org.apache.kafka.clients.NetworkClient:667) > [2023-11-28 23:20:09,619] ERROR [QuorumController id=1000] Cancelling > deferred write event maybeFenceReplicas because the event queue is now > closed. (org.apache.kafka.controller.QuorumController:1297)``` -- This message was sent by Atlassian Jira (v8.20.10#820010)