[ https://issues.apache.org/jira/browse/KAFKA-17203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-17203: --------------------------------------- Assignee: PoAn Yang > StreamThread leaking producer instances > --------------------------------------- > > Key: KAFKA-17203 > URL: https://issues.apache.org/jira/browse/KAFKA-17203 > Project: Kafka > Issue Type: Test > Components: streams > Affects Versions: 3.9.0 > Reporter: Greg Harris > Assignee: PoAn Yang > Priority: Minor > Labels: newbie > > When running > EosIntegrationTest.shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled > leaks streams producers with the KAFKA-15845 leak testing extension, I > observed that this test appears to consistently leak StreamsProducers. The > producer is instantiated here: > {noformat} > This test contains a resource leak. Close the resources, or open a KAFKA > ticket and annotate this class with > @LeakTestingExtension.IgnoreAll("KAFKA-XYZ") > org.opentest4j.AssertionFailedError: This test contains a resource leak. > Close the resources, or open a KAFKA ticket and annotate this class with > @LeakTestingExtension.IgnoreAll("KAFKA-XYZ") > at > org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:98) > at > org.apache.kafka.common.network.LeakTestingExtension$All.afterAll(LeakTestingExtension.java:123) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) > Caused by: org.opentest4j.AssertionFailedError: Leak check failed > at > org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:89) > at > org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:96) > ... 2 more > Suppressed: org.opentest4j.AssertionFailedError: AbstractSelector > instances left open > at > org.apache.kafka.common.utils.PredicateLeakTester.lambda$start$0(PredicateLeakTester.java:94) > at > org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:86) > ... 3 more > Suppressed: java.lang.Exception: Opened sun.nio.ch.KQueueSelectorImpl > at > org.apache.kafka.common.utils.PredicateLeakTester.open(PredicateLeakTester.java:63) > at > org.apache.kafka.common.network.NetworkContextLeakTester$RecordingSelectorProvider.openSelector(NetworkContextLeakTester.java:135) > at > org.apache.kafka.common.network.TestNetworkContext$SelectorProviderDecorator.openSelector(TestNetworkContext.java:166) > at > org.apache.kafka.common.network.Selector.<init>(Selector.java:160) > at > org.apache.kafka.common.network.Selector.<init>(Selector.java:213) > at > org.apache.kafka.common.network.Selector.<init>(Selector.java:225) > at > org.apache.kafka.common.network.Selector.<init>(Selector.java:229) > at > org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:225) > at > org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:163) > at > org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:526) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:465) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.<init>(StreamsProducer.java:142) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createRecordCollector(ActiveTaskCreator.java:196) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:265) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:176) > at > org.apache.kafka.streams.processor.internals.TaskManager.createNewTasks(TaskManager.java:441) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:390) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:327) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:416) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:504) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:415) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:511) > at > org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.updateAssignmentMetadataIfNeeded(LegacyKafkaConsumer.java:653) > at > org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:612) > at > org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:592) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1276) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1224) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:957) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:712) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671){noformat} > and appears to remain unclosed even after kafkaStreams.close() and > CLUSTER.stop() are called. > I've seen this appear in other suites, I just singled out this one test/suite > arbitrarily. -- This message was sent by Atlassian Jira (v8.20.10#820010)