[ 
https://issues.apache.org/jira/browse/KAFKA-17203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868995#comment-17868995
 ] 

Matthias J. Sax commented on KAFKA-17203:
-----------------------------------------

Thanks for your interest! Go for it!

> StreamThread leaking producer instances
> ---------------------------------------
>
>                 Key: KAFKA-17203
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17203
>             Project: Kafka
>          Issue Type: Test
>          Components: streams
>    Affects Versions: 3.9.0
>            Reporter: Greg Harris
>            Assignee: PoAn Yang
>            Priority: Minor
>              Labels: newbie
>
> When running 
> EosIntegrationTest.shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled
>  leaks streams producers with the KAFKA-15845 leak testing extension, I 
> observed that this test appears to consistently leak StreamsProducers. The 
> producer is instantiated here:
> {noformat}
> This test contains a resource leak. Close the resources, or open a KAFKA 
> ticket and annotate this class with 
> @LeakTestingExtension.IgnoreAll("KAFKA-XYZ")
> org.opentest4j.AssertionFailedError: This test contains a resource leak. 
> Close the resources, or open a KAFKA ticket and annotate this class with 
> @LeakTestingExtension.IgnoreAll("KAFKA-XYZ")
>     at 
> org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:98)
>     at 
> org.apache.kafka.common.network.LeakTestingExtension$All.afterAll(LeakTestingExtension.java:123)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
> Caused by: org.opentest4j.AssertionFailedError: Leak check failed
>     at 
> org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:89)
>     at 
> org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:96)
>     ... 2 more
>     Suppressed: org.opentest4j.AssertionFailedError: AbstractSelector 
> instances left open
>         at 
> org.apache.kafka.common.utils.PredicateLeakTester.lambda$start$0(PredicateLeakTester.java:94)
>         at 
> org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:86)
>         ... 3 more
>         Suppressed: java.lang.Exception: Opened sun.nio.ch.KQueueSelectorImpl
>             at 
> org.apache.kafka.common.utils.PredicateLeakTester.open(PredicateLeakTester.java:63)
>             at 
> org.apache.kafka.common.network.NetworkContextLeakTester$RecordingSelectorProvider.openSelector(NetworkContextLeakTester.java:135)
>             at 
> org.apache.kafka.common.network.TestNetworkContext$SelectorProviderDecorator.openSelector(TestNetworkContext.java:166)
>             at 
> org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
>             at 
> org.apache.kafka.common.network.Selector.<init>(Selector.java:213)
>             at 
> org.apache.kafka.common.network.Selector.<init>(Selector.java:225)
>             at 
> org.apache.kafka.common.network.Selector.<init>(Selector.java:229)
>             at 
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:225)
>             at 
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:163)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:526)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:465)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)
>             at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39)
>             at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.<init>(StreamsProducer.java:142)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createRecordCollector(ActiveTaskCreator.java:196)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:265)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:176)
>             at 
> org.apache.kafka.streams.processor.internals.TaskManager.createNewTasks(TaskManager.java:441)
>             at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:390)
>             at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559)
>             at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:327)
>             at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:416)
>             at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:504)
>             at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:415)
>             at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:511)
>             at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.updateAssignmentMetadataIfNeeded(LegacyKafkaConsumer.java:653)
>             at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:612)
>             at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:592)
>             at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
>             at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1276)
>             at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1224)
>             at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:957)
>             at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:712)
>             at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671){noformat}
> and appears to remain unclosed even after kafkaStreams.close() and 
> CLUSTER.stop() are called.
> I've seen this appear in other suites, I just singled out this one test/suite 
> arbitrarily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to