Hi, I would try the following
(1) Check that Kafka is running and configured correctly, i.e. the topic is available and a partition is assigned. (2) Downgrade the Kafka Client to the version of 2.6.4 - as there were no actual code changes on the Storm side, replacing the libraries should be straightforward. Check if it also happens with the older Kafka Client library. (3) Switch to the Kafka/Storm Trident implementation before the changes of the PR were applied and see, if it also happens if you rely on the non changed code. You can simply grab the code from version 2.6.4, put that in your namespace and adjust your code to use the copied classes. Afterwards, check, if it happens again. This should give a more distinct picture, what is actually going wrong. Another alternative would be to provide a small reproducer, so people have a chance to debug. Gruß Richard > Am 12.03.2025 um 17:56 schrieb Dabalos, Rahmat Peter via user > <[email protected]>: > > Hi Richard, > > As you mentioned that I verify that this is not a setup issue. I have tried > to remove any parallelisms and the workers on the trident. > > This was before: > <image003.png> > and this is the config without any parallelisms: > <image001.png> > and in this setup the topology works, as you can see, the indexing is now > working: > <image002.png> > > We did not have any code change except upgrading the storm client to 2.8.0 > from 2.6.4. And we don’t know what else to modify to get past these errors: > 2025-03-10 17:45:43.862 o.a.s.u.Utils > Thread-16-spout-message-indexer-consumer-executor[21, 21] [ERROR] Async loop > died! > java.lang.RuntimeException: java.lang.IllegalStateException: Consumer is not > subscribed to any topics or assigned any partitions > at > org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:305) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.executor.Executor.accept(Executor.java:292) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.Utils$1.run(Utils.java:398) > [storm-client-2.8.0.jar:2.8.0] > at java.base/java.lang.Thread.run(Thread.java:840) [?:?] > Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any > topics or assigned any partitions > at > org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:608) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBatchNew(KafkaTridentSpoutEmitter.java:178) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.emitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166) > ~[storm-client-2.8.0.jar:2.8.0] > > > Do you have any suggestions on what we need to try? > > Best regards, > Rahm Dabalos > > > From: Richard Zowalla <[email protected]> > Date: Tuesday, March 11, 2025 at 11:41 PM > To: [email protected] <[email protected]> > Subject: Re: Upgrading to 2.8.0 > > Kafka Classic Consumer is contained ín org.apache.kafka:kafka-clients:3.9.0, > which is a dependency of storm-kafka-client. > > > Am 11.03.2025 um 16:04 schrieb Dabalos, Rahmat Peter via user > <[email protected]>: > > Hi Richard, > > Would you know why in the stack trace, there is a line calling > ClassicKafkaConsumer.poll? > > 2025-03-10 17:45:43.862 o.a.s.u.Utils > Thread-16-spout-message-indexer-consumer-executor[21, 21] [ERROR] Async loop > died! > java.lang.RuntimeException: java.lang.IllegalStateException: Consumer is not > subscribed to any topics or assigned any partitions > at > org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:305) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.executor.Executor.accept(Executor.java:292) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.Utils$1.run(Utils.java:398) > [storm-client-2.8.0.jar:2.8.0] > at java.base/java.lang.Thread.run(Thread.java:840) [?:?] > Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any > topics or assigned any partitions > at > org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:608) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBatchNew(KafkaTridentSpoutEmitter.java:178) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.emitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45) > ~[stormjar.jar:dev-3.0.8.9.8] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166) > ~[storm-client-2.8.0.jar:2.8.0] > > looking at my claspath and dependencies, I don’t see the class > ClassicKafkaConsumer. > > Best regards, > Rahm > > From: Richard Zowalla <[email protected]> > Date: Thursday, March 6, 2025 at 6:32 PM > To: [email protected] <[email protected]> > Subject: Re: Upgrading to 2.8.0 > > An alternative to verify that his is not a side effect of the change would be > to just grab the code from before the change, put that into your own package > namespace and use that one. > > > Am 06.03.2025 um 11:29 schrieb Richard Zowalla <[email protected]>: > > Hi, > > Disclaimer: I am not a Kafka nor Trident user. > > I am not aware of any configuration related changes from the PR mentioned. > > Sounds more like a setup issue, so maybe first verify that this is setup as > expected. > > Gruß > Richard > > > Am 06.03.2025 um 10:47 schrieb Dabalos, Rahmat Peter via user > <[email protected]>: > > Hi Richard, > > yup, indeed our kafka client is not updated. We tried to update it, now it > has the following error: > Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any > topics or assigned any partitions > at > org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:608) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBatchNew(KafkaTridentSpoutEmitter.java:178) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.emitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:77) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:235) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:298) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.executor.Executor.accept(Executor.java:292) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.Utils$1.run(Utils.java:398) > ~[storm-client-2.8.0.jar:2.8.0] > is there a new way of configuring the brokers and the topics? > > Also for the change related to the trident spouts, should we also change how > we initialize it? > <image001.png> > Best regards, > Rahm > > From: Richard Zowalla <[email protected] <mailto:[email protected]>> > Date: Wednesday, March 5, 2025 at 4:09 AM > To: Dabalos, Rahmat Peter via user <[email protected] > <mailto:[email protected]>> > Subject: Re: Upgrading to 2.8.0 > > Hi, > > There was a Kafka Client update in version 2.7.1 (if I recall > correctly): > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Farchive.apache.org%2Fdist%2Fstorm%2Fapache-storm-2.7.1%2FRELEASE_NOTES.html&data=05%7C02%7Crahmat.peter.dabalos%40ing.com%7Cb471916dcc1a490e945708dd5b586930%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638767157471557884%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=YbNhIMgbpZNpswo4lW37aZ5XKMN4I4DGjYjTVxuQJGg%3D&reserved=0 > <https://archive.apache.org/dist/storm/apache-storm-2.7.1/RELEASE_NOTES.html> > > Additionally, there was a change to the KafkaTrident Spouts here: > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fpull%2F3679&data=05%7C02%7Crahmat.peter.dabalos%40ing.com%7Cb471916dcc1a490e945708dd5b586930%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638767157471577149%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=LNi6K7Mpp1SxT3HxMQGtah4ACvBbLDZFfTDPNNH9yL8%3D&reserved=0 > <https://github.com/apache/storm/pull/3679> > > It might be a good idea to first verify if you're using the correct > versions, as a NoSuchMethodError could indicate you're running an older > version of the Kafka client library in your shade. > > Gruß > r > > > > Am Dienstag, dem 04.03.2025 um 16:35 +0000 schrieb Dabalos, Rahmat > Peter: > > Hello, > > > > Good day, as we are now updating our Storm topologies, we are now > > experiencing an error upon startup. The version we are coming from is > > 2.6.4, and we want to upgrade it to the latest 2.8.0. Do you have a > > guide regarding this migration? And if there are changes that we need > > to do in our code before migration? > > > > For your references here are the errors we encountered: > > 1. java.lang.NoSuchMethodError: > > 'org.apache.kafka.clients.consumer.ConsumerRecords > > org.apache.kafka.clients.consumer.Consumer.poll(java.time.Duration)'\ > > at > > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBat > > chNew(KafkaTridentSpoutEmitter.java:178) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.e > > mitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$ > > Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSp > > outExecutor.java:77) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.trident.topology.TridentBoltExecutor.execute(Trident > > BoltExecutor.java:235) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecuto > > r.java:212) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:29 > > 8) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.Executor.accept(Executor.java:292) ~[storm- > > client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm- > > client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java: > > 154) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java: > > 140) ~[storm-client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.utils.Utils$1.run(Utils.java:398) [storm- > > client-2.8.0.jar:2.8.0]\ > > at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\ > > 2. 2025-03-04 09:13:19.527 o.a.k.c.n.SslTransportLayer ShutdownHook- > > shutdownFunc [WARN] Failed to send SSL Close message\ > > java.io.IOException: Unexpected status returned by SSLEngine.wrap, > > expected CLOSED, received OK. Will not send close message to peer.\ > > at > > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportL > > ayer.java:158) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java: > > 59) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.Selector.doClose(Selector.java:584) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.Selector.close(Selector.java:575) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.Selector.close(Selector.java:541) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.Selector.close(Selector.java:250) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:506) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clo > > se(ConsumerNetworkClient.java:439) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71 > > ) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j > > ava:1614) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j > > ava:1574) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j > > ava:1550) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.close(K > > afkaTridentSpoutEmitter.java:377) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.c > > lose(KafkaTridentOpaqueSpoutEmitter.java:67) ~[stormjar.jar:dev- > > 3.0.8.8]\ > > at > > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$ > > Emitter.close(OpaquePartitionedTridentSpoutExecutor.java:218) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.trident.spout.TridentSpoutExecutor.cleanup(TridentSp > > outExecutor.java:84) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.trident.topology.TridentBoltExecutor.cleanup(Trident > > BoltExecutor.java:253) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.ExecutorShutdown.shutdown(ExecutorShutdown. > > java:120) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.daemon.worker.Worker.shutdown(Worker.java:518) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.utils.Utils.lambda$addShutdownHookWithDelayedForceKi > > ll$1(Utils.java:357) ~[storm-client-2.8.0.jar:2.8.0]\ > > at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\ > > \ > > \'97\ > > 2025-03-04 09:13:20.376 o.a.s.u.Utils Thread-20-b-0-message-spout- > > deserializer-validator-storm-event-extractor-executor[7, 7] [ERROR] > > Async loop died!\ > > java.lang.IllegalStateException: Timer is not active\ > > at > > org.apache.storm.StormTimer.checkActive(StormTimer.java:159) ~[storm- > > client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.StormTimer.scheduleMs(StormTimer.java:85) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.StormTimer.schedule(StormTimer.java:65) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.StormTimer.schedule(StormTimer.java:69) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.StormTimer.scheduleRecurring(StormTimer.java:107) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.Executor.setupTicks(Executor.java:517) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor.init(BoltExecutor.java:13 > > 2) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:13 > > 8) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:54 > > ) ~[storm-client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.utils.Utils$1.run(Utils.java:393) [storm- > > client-2.8.0.jar:2.8.0]\ > > at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\ > > > > > > Best regards, > > Rahm Dabalos > > ----------------------------------------------------------------- > > ATTENTION: > > The information in this e-mail is confidential and only meant for the > > intended recipient. If you are not the intended recipient, don't use > > or disclose it in any way. Please let the sender know and delete the > > message immediately. > > ----------------------------------------------------------------- > > ----------------------------------------------------------------- > ATTENTION: > The information in this e-mail is confidential and only meant for the > intended recipient. If you are not the intended recipient, don't use or > disclose it in any way. Please let the sender know and delete the message > immediately. > ----------------------------------------------------------------- > > > ----------------------------------------------------------------- > ATTENTION: > The information in this e-mail is confidential and only meant for the > intended recipient. If you are not the intended recipient, don't use or > disclose it in any way. Please let the sender know and delete the message > immediately. > ----------------------------------------------------------------- > > ----------------------------------------------------------------- > ATTENTION: > The information in this e-mail is confidential and only meant for the > intended recipient. If you are not the intended recipient, don't use or > disclose it in any way. Please let the sender know and delete the message > immediately. > -----------------------------------------------------------------
