[ 
https://issues.apache.org/jira/browse/KAFKA-15804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784630#comment-17784630
 ] 

Greg Harris commented on KAFKA-15804:
-------------------------------------

I believe what is happening is:

1. The SocketServer is created
2. The exception is thrown from the dynamicConfigManager
3. SocketServer.enableRequestProcessing is never called, so the SocketServer is 
never started
4. Because the SocketServer is never started, the SocketServer main runnable 
never exits, and so the SocketServerChannel is never closed.

I think that when the SocketServer beginShutdown()/shutdown() is called without 
calling enableRequestProcessing() first, the SocketServer should still close 
these resources.

> Broker leaks ServerSocketChannel when exception is thrown from 
> ZkConfigManager during startup
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15804
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15804
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, unit tests
>    Affects Versions: 3.6.0
>            Reporter: Greg Harris
>            Priority: Minor
>
> This exception is thrown during the 
> RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic
>  test in zk mode:
> {noformat}
> org.apache.kafka.common.config.ConfigException: You have to delete all topics 
> with the property remote.storage.enable=true before disabling tiered storage 
> cluster-wide
>         at 
> org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566)
>         at kafka.log.LogManager.updateTopicConfig(LogManager.scala:956)
>         at 
> kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73)
>         at 
> kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94)
>         at 
> kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176)
>         at 
> kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:360)
>         at 
> kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175)
>         at 
> kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166)
>         at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115)
>         at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:575)
>         at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
>         at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
>         at 
> kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
>         at 
> kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
>         at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
>         at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
>         at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
>         at 
> kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
> This leak only occurs for this one test in the RemoteTopicCrudTest; all other 
> tests including the kraft-mode version do not exhibit a leaked socket.
> Here is where the ServerSocket is instantiated:
> {noformat}
>         at 
> java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113)
>         at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724)
>         at kafka.network.Acceptor.<init>(SocketServer.scala:608)
>         at kafka.network.DataPlaneAcceptor.<init>(SocketServer.scala:454)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249)
>         at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
>         at 
> kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
>         at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
>         at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>         at kafka.network.SocketServer.<init>(SocketServer.scala:175)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:344)
>         at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
>         at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
>         at 
> kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
>         at 
> kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
>         at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
>         at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
>         at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
>         at 
> kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
> And the associated DataPlaneAcceptor:
> {noformat}
>          at java.base/java.nio.channels.Selector.open(Selector.java:295)
>          at kafka.network.Acceptor.<init>(SocketServer.scala:598)
>          at kafka.network.DataPlaneAcceptor.<init>(SocketServer.scala:454)
>          at 
> kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270)
>          at 
> kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249)
>          at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
>          at 
> kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
>          at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
>          at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
>          at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>          at kafka.network.SocketServer.<init>(SocketServer.scala:175)
>          at kafka.server.KafkaServer.startup(KafkaServer.scala:344)
> {noformat}
> And two Processors:
> {noformat}
>         at java.base/java.nio.channels.Selector.open(Selector.java:295)
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:159)
>         at kafka.network.Processor.createSelector(SocketServer.scala:995)
>         at kafka.network.Processor.<init>(SocketServer.scala:973)
>         at kafka.network.Acceptor.newProcessor(SocketServer.scala:879)
>         at 
> kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>         at kafka.network.Acceptor.addProcessors(SocketServer.scala:848)
>         at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251)
>         at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
>         at 
> kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
>         at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
>         at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>         at kafka.network.SocketServer.<init>(SocketServer.scala:175)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:344){noformat}
> I found this while investigating the causes of recent build failures where 
> Gradle is unable to connect to the test runners. I don't believe this bug has 
> any impact on end-users, but may have an impact on users running an embedded 
> kafka in a test environment with a long-lived JVM.
> I have seen other tests leaking sockets/leaving files open, so the 
> RemoteTopicCrudTest is not unique. It just happens to be the first test i've 
> looked into.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to