[ https://issues.apache.org/jira/browse/KAFKA-15804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Greg Harris updated KAFKA-15804: -------------------------------- Description: This exception is thrown during the RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic test in zk mode: {noformat} org.apache.kafka.common.config.ConfigException: You have to delete all topics with the property remote.storage.enable=true before disabling tiered storage cluster-wide at org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566) at kafka.log.LogManager.updateTopicConfig(LogManager.scala:956) at kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73) at kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94) at kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176) at kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175) at scala.collection.immutable.Map$Map2.foreach(Map.scala:360) at kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175) at kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166) at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115) at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166) at kafka.server.KafkaServer.startup(KafkaServer.scala:575) at kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356) at kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352) at scala.collection.immutable.List.foreach(List.scala:333) at kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352) at kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146) at kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) at kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat} This leak only occurs for this one test in the RemoteTopicCrudTest; all other tests including the kraft-mode version do not exhibit a leaked socket. Here is where the ServerSocket is instantiated: {noformat} at java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113) at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724) at kafka.network.Acceptor.<init>(SocketServer.scala:608) at kafka.network.DataPlaneAcceptor.<init>(SocketServer.scala:454) at kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270) at kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249) at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) at kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at scala.collection.AbstractIterable.foreach(Iterable.scala:933) at kafka.network.SocketServer.<init>(SocketServer.scala:175) at kafka.server.KafkaServer.startup(KafkaServer.scala:344) at kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356) at kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352) at scala.collection.immutable.List.foreach(List.scala:333) at kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352) at kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146) at kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) at kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat} And the associated DataPlaneAcceptor: {noformat} at java.base/java.nio.channels.Selector.open(Selector.java:295) at kafka.network.Acceptor.<init>(SocketServer.scala:598) at kafka.network.DataPlaneAcceptor.<init>(SocketServer.scala:454) at kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270) at kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249) at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) at kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at scala.collection.AbstractIterable.foreach(Iterable.scala:933) at kafka.network.SocketServer.<init>(SocketServer.scala:175) at kafka.server.KafkaServer.startup(KafkaServer.scala:344) {noformat} And two Processors: {noformat} at java.base/java.nio.channels.Selector.open(Selector.java:295) at org.apache.kafka.common.network.Selector.<init>(Selector.java:159) at kafka.network.Processor.createSelector(SocketServer.scala:995) at kafka.network.Processor.<init>(SocketServer.scala:973) at kafka.network.Acceptor.newProcessor(SocketServer.scala:879) at kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.Acceptor.addProcessors(SocketServer.scala:848) at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523) at kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) at kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at scala.collection.AbstractIterable.foreach(Iterable.scala:933) at kafka.network.SocketServer.<init>(SocketServer.scala:175) at kafka.server.KafkaServer.startup(KafkaServer.scala:344){noformat} I found this while investigating the causes of recent build failures where Gradle is unable to connect to the test runners. I don't believe this bug has any impact on end-users, but may have an impact on users running an embedded kafka in a test environment with a long-lived JVM. I have seen other tests leaking sockets/leaving files open, so the RemoteTopicCrudTest is not unique. It just happens to be the first test i've looked into. was: This exception is thrown during the RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic test in zk mode: {noformat} org.apache.kafka.common.config.ConfigException: You have to delete all topics with the property remote.storage.enable=true before disabling tiered storage cluster-wide org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566) kafka.log.LogManager.updateTopicConfig(LogManager.scala:956) kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73) kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94) kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176) kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175) scala.collection.immutable.Map$Map2.foreach(Map.scala:360) kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175) kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166) scala.collection.immutable.HashMap.foreach(HashMap.scala:1115) kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166) kafka.server.KafkaServer.startup(KafkaServer.scala:575) kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356) kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352) scala.collection.immutable.List.foreach(List.scala:333) kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352) kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146) kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319) org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat} This leak only occurs for this one test in the RemoteTopicCrudTest; all other tests including the kraft-mode version do not exhibit a leaked socket. Here is where the ServerSocket is instantiated: {noformat} at java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113) at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724) at kafka.network.Acceptor.<init>(SocketServer.scala:608) at kafka.network.DataPlaneAcceptor.<init>(SocketServer.scala:454) at kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270) at kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249) at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) at kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at scala.collection.AbstractIterable.foreach(Iterable.scala:933) at kafka.network.SocketServer.<init>(SocketServer.scala:175) at kafka.server.KafkaServer.startup(KafkaServer.scala:344) at kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356) at kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352) at scala.collection.immutable.List.foreach(List.scala:333) at kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352) at kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146) at kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) at kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat} And the associated DataPlaneAcceptor: {noformat} at java.base/java.nio.channels.Selector.open(Selector.java:295) at kafka.network.Acceptor.<init>(SocketServer.scala:598) at kafka.network.DataPlaneAcceptor.<init>(SocketServer.scala:454) at kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270) at kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249) at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) at kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at scala.collection.AbstractIterable.foreach(Iterable.scala:933) at kafka.network.SocketServer.<init>(SocketServer.scala:175) at kafka.server.KafkaServer.startup(KafkaServer.scala:344) {noformat} And two Processors: {noformat} at java.base/java.nio.channels.Selector.open(Selector.java:295) at org.apache.kafka.common.network.Selector.<init>(Selector.java:159) at kafka.network.Processor.createSelector(SocketServer.scala:995) at kafka.network.Processor.<init>(SocketServer.scala:973) at kafka.network.Acceptor.newProcessor(SocketServer.scala:879) at kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.Acceptor.addProcessors(SocketServer.scala:848) at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523) at kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) at kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at scala.collection.AbstractIterable.foreach(Iterable.scala:933) at kafka.network.SocketServer.<init>(SocketServer.scala:175) at kafka.server.KafkaServer.startup(KafkaServer.scala:344){noformat} I found this while investigating the causes of recent build failures where Gradle is unable to connect to the test runners. I don't believe this bug has any impact on end-users, but may have an impact on users running an embedded kafka in a test environment with a long-lived JVM. > Broker leaks ServerSocketChannel when exception is thrown from > ZkConfigManager during startup > --------------------------------------------------------------------------------------------- > > Key: KAFKA-15804 > URL: https://issues.apache.org/jira/browse/KAFKA-15804 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage, unit tests > Affects Versions: 3.6.0 > Reporter: Greg Harris > Priority: Minor > > This exception is thrown during the > RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic > test in zk mode: > {noformat} > org.apache.kafka.common.config.ConfigException: You have to delete all topics > with the property remote.storage.enable=true before disabling tiered storage > cluster-wide > at > org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566) > at kafka.log.LogManager.updateTopicConfig(LogManager.scala:956) > at > kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73) > at > kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94) > at > kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176) > at > kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:360) > at > kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175) > at > kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166) > at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115) > at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166) > at kafka.server.KafkaServer.startup(KafkaServer.scala:575) > at > kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356) > at > kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352) > at scala.collection.immutable.List.foreach(List.scala:333) > at > kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352) > at > kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146) > at > kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319) > at > org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) > at > org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) > at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) > at > kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat} > This leak only occurs for this one test in the RemoteTopicCrudTest; all other > tests including the kraft-mode version do not exhibit a leaked socket. > Here is where the ServerSocket is instantiated: > {noformat} > at > java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113) > at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724) > at kafka.network.Acceptor.<init>(SocketServer.scala:608) > at kafka.network.DataPlaneAcceptor.<init>(SocketServer.scala:454) > at > kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270) > at > kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249) > at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) > at > kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) > at scala.collection.AbstractIterable.foreach(Iterable.scala:933) > at kafka.network.SocketServer.<init>(SocketServer.scala:175) > at kafka.server.KafkaServer.startup(KafkaServer.scala:344) > at > kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356) > at > kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352) > at scala.collection.immutable.List.foreach(List.scala:333) > at > kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352) > at > kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146) > at > kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319) > at > org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) > at > org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) > at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) > at > kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat} > And the associated DataPlaneAcceptor: > {noformat} > at java.base/java.nio.channels.Selector.open(Selector.java:295) > at kafka.network.Acceptor.<init>(SocketServer.scala:598) > at kafka.network.DataPlaneAcceptor.<init>(SocketServer.scala:454) > at > kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270) > at > kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249) > at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) > at > kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) > at scala.collection.AbstractIterable.foreach(Iterable.scala:933) > at kafka.network.SocketServer.<init>(SocketServer.scala:175) > at kafka.server.KafkaServer.startup(KafkaServer.scala:344) > {noformat} > And two Processors: > {noformat} > at java.base/java.nio.channels.Selector.open(Selector.java:295) > at org.apache.kafka.common.network.Selector.<init>(Selector.java:159) > at kafka.network.Processor.createSelector(SocketServer.scala:995) > at kafka.network.Processor.<init>(SocketServer.scala:973) > at kafka.network.Acceptor.newProcessor(SocketServer.scala:879) > at > kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at kafka.network.Acceptor.addProcessors(SocketServer.scala:848) > at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523) > at > kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251) > at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) > at > kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) > at scala.collection.AbstractIterable.foreach(Iterable.scala:933) > at kafka.network.SocketServer.<init>(SocketServer.scala:175) > at kafka.server.KafkaServer.startup(KafkaServer.scala:344){noformat} > I found this while investigating the causes of recent build failures where > Gradle is unable to connect to the test runners. I don't believe this bug has > any impact on end-users, but may have an impact on users running an embedded > kafka in a test environment with a long-lived JVM. > I have seen other tests leaking sockets/leaving files open, so the > RemoteTopicCrudTest is not unique. It just happens to be the first test i've > looked into. -- This message was sent by Atlassian Jira (v8.20.10#820010)