Hi, Disclaimer: I am not a Kafka nor Trident user.
I am not aware of any configuration related changes from the PR mentioned. Sounds more like a setup issue, so maybe first verify that this is setup as expected. Gruß Richard > Am 06.03.2025 um 10:47 schrieb Dabalos, Rahmat Peter via user > <[email protected]>: > > Hi Richard, > > yup, indeed our kafka client is not updated. We tried to update it, now it > has the following error: > Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any > topics or assigned any partitions > at > org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:608) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBatchNew(KafkaTridentSpoutEmitter.java:178) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.emitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45) > ~[stormjar.jar:dev-3.0.8.9.0] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:77) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:235) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:298) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.executor.Executor.accept(Executor.java:292) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) > ~[storm-client-2.8.0.jar:2.8.0] > at > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) > ~[storm-client-2.8.0.jar:2.8.0] > at org.apache.storm.utils.Utils$1.run(Utils.java:398) > ~[storm-client-2.8.0.jar:2.8.0] > is there a new way of configuring the brokers and the topics? > > Also for the change related to the trident spouts, should we also change how > we initialize it? > <image001.png> > Best regards, > Rahm > > From: Richard Zowalla <[email protected] <mailto:[email protected]>> > Date: Wednesday, March 5, 2025 at 4:09 AM > To: Dabalos, Rahmat Peter via user <[email protected] > <mailto:[email protected]>> > Subject: Re: Upgrading to 2.8.0 > > Hi, > > There was a Kafka Client update in version 2.7.1 (if I recall > correctly): > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Farchive.apache.org%2Fdist%2Fstorm%2Fapache-storm-2.7.1%2FRELEASE_NOTES.html&data=05%7C02%7Crahmat.peter.dabalos%40ing.com%7Cb471916dcc1a490e945708dd5b586930%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638767157471557884%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=YbNhIMgbpZNpswo4lW37aZ5XKMN4I4DGjYjTVxuQJGg%3D&reserved=0 > <https://archive.apache.org/dist/storm/apache-storm-2.7.1/RELEASE_NOTES.html> > > Additionally, there was a change to the KafkaTrident Spouts here: > https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fpull%2F3679&data=05%7C02%7Crahmat.peter.dabalos%40ing.com%7Cb471916dcc1a490e945708dd5b586930%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638767157471577149%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=LNi6K7Mpp1SxT3HxMQGtah4ACvBbLDZFfTDPNNH9yL8%3D&reserved=0 > <https://github.com/apache/storm/pull/3679> > > It might be a good idea to first verify if you're using the correct > versions, as a NoSuchMethodError could indicate you're running an older > version of the Kafka client library in your shade. > > Gruß > r > > > > Am Dienstag, dem 04.03.2025 um 16:35 +0000 schrieb Dabalos, Rahmat > Peter: > > Hello, > > > > Good day, as we are now updating our Storm topologies, we are now > > experiencing an error upon startup. The version we are coming from is > > 2.6.4, and we want to upgrade it to the latest 2.8.0. Do you have a > > guide regarding this migration? And if there are changes that we need > > to do in our code before migration? > > > > For your references here are the errors we encountered: > > 1. java.lang.NoSuchMethodError: > > 'org.apache.kafka.clients.consumer.ConsumerRecords > > org.apache.kafka.clients.consumer.Consumer.poll(java.time.Duration)'\ > > at > > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBat > > chNew(KafkaTridentSpoutEmitter.java:178) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.e > > mitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$ > > Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSp > > outExecutor.java:77) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.trident.topology.TridentBoltExecutor.execute(Trident > > BoltExecutor.java:235) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecuto > > r.java:212) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:29 > > 8) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.Executor.accept(Executor.java:292) ~[storm- > > client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm- > > client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java: > > 154) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java: > > 140) ~[storm-client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.utils.Utils$1.run(Utils.java:398) [storm- > > client-2.8.0.jar:2.8.0]\ > > at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\ > > 2. 2025-03-04 09:13:19.527 o.a.k.c.n.SslTransportLayer ShutdownHook- > > shutdownFunc [WARN] Failed to send SSL Close message\ > > java.io.IOException: Unexpected status returned by SSLEngine.wrap, > > expected CLOSED, received OK. Will not send close message to peer.\ > > at > > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportL > > ayer.java:158) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java: > > 59) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.Selector.doClose(Selector.java:584) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.Selector.close(Selector.java:575) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.Selector.close(Selector.java:541) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.common.network.Selector.close(Selector.java:250) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:506) > > ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clo > > se(ConsumerNetworkClient.java:439) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71 > > ) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j > > ava:1614) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j > > ava:1574) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j > > ava:1550) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.close(K > > afkaTridentSpoutEmitter.java:377) ~[stormjar.jar:dev-3.0.8.8]\ > > at > > org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.c > > lose(KafkaTridentOpaqueSpoutEmitter.java:67) ~[stormjar.jar:dev- > > 3.0.8.8]\ > > at > > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$ > > Emitter.close(OpaquePartitionedTridentSpoutExecutor.java:218) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.trident.spout.TridentSpoutExecutor.cleanup(TridentSp > > outExecutor.java:84) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.trident.topology.TridentBoltExecutor.cleanup(Trident > > BoltExecutor.java:253) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.ExecutorShutdown.shutdown(ExecutorShutdown. > > java:120) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.daemon.worker.Worker.shutdown(Worker.java:518) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.utils.Utils.lambda$addShutdownHookWithDelayedForceKi > > ll$1(Utils.java:357) ~[storm-client-2.8.0.jar:2.8.0]\ > > at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\ > > \ > > \'97\ > > 2025-03-04 09:13:20.376 o.a.s.u.Utils Thread-20-b-0-message-spout- > > deserializer-validator-storm-event-extractor-executor[7, 7] [ERROR] > > Async loop died!\ > > java.lang.IllegalStateException: Timer is not active\ > > at > > org.apache.storm.StormTimer.checkActive(StormTimer.java:159) ~[storm- > > client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.StormTimer.scheduleMs(StormTimer.java:85) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.StormTimer.schedule(StormTimer.java:65) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.StormTimer.schedule(StormTimer.java:69) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.StormTimer.scheduleRecurring(StormTimer.java:107) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.Executor.setupTicks(Executor.java:517) > > ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor.init(BoltExecutor.java:13 > > 2) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:13 > > 8) ~[storm-client-2.8.0.jar:2.8.0]\ > > at > > org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:54 > > ) ~[storm-client-2.8.0.jar:2.8.0]\ > > at org.apache.storm.utils.Utils$1.run(Utils.java:393) [storm- > > client-2.8.0.jar:2.8.0]\ > > at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\ > > > > > > Best regards, > > Rahm Dabalos > > ----------------------------------------------------------------- > > ATTENTION: > > The information in this e-mail is confidential and only meant for the > > intended recipient. If you are not the intended recipient, don't use > > or disclose it in any way. Please let the sender know and delete the > > message immediately. > > ----------------------------------------------------------------- > > ----------------------------------------------------------------- > ATTENTION: > The information in this e-mail is confidential and only meant for the > intended recipient. If you are not the intended recipient, don't use or > disclose it in any way. Please let the sender know and delete the message > immediately. > -----------------------------------------------------------------
