Hi Richard,
Would you know why in the stack trace, there is a line calling
ClassicKafkaConsumer.poll?
2025-03-10 17:45:43.862 o.a.s.u.Utils
Thread-16-spout-message-indexer-consumer-executor[21, 21] [ERROR] Async loop
died!
java.lang.RuntimeException: java.lang.IllegalStateException: Consumer is not
subscribed to any topics or assigned any partitions
at
org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:305)
~[storm-client-2.8.0.jar:2.8.0]
at org.apache.storm.executor.Executor.accept(Executor.java:292)
~[storm-client-2.8.0.jar:2.8.0]
at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113)
~[storm-client-2.8.0.jar:2.8.0]
at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89)
~[storm-client-2.8.0.jar:2.8.0]
at
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154)
~[storm-client-2.8.0.jar:2.8.0]
at
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140)
~[storm-client-2.8.0.jar:2.8.0]
at org.apache.storm.utils.Utils$1.run(Utils.java:398)
[storm-client-2.8.0.jar:2.8.0]
at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any
topics or assigned any partitions
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:608)
~[stormjar.jar:dev-3.0.8.9.8]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596)
~[stormjar.jar:dev-3.0.8.9.8]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
~[stormjar.jar:dev-3.0.8.9.8]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBatchNew(KafkaTridentSpoutEmitter.java:178)
~[stormjar.jar:dev-3.0.8.9.8]
at
org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.emitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45)
~[stormjar.jar:dev-3.0.8.9.8]
at
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166)
~[storm-client-2.8.0.jar:2.8.0]
looking at my claspath and dependencies, I don’t see the class
ClassicKafkaConsumer.
Best regards,
Rahm
From: Richard Zowalla <[email protected]>
Date: Thursday, March 6, 2025 at 6:32 PM
To: [email protected] <[email protected]>
Subject: Re: Upgrading to 2.8.0
An alternative to verify that his is not a side effect of the change would be
to just grab the code from before the change, put that into your own package
namespace and use that one.
Am 06.03.2025 um 11:29 schrieb Richard Zowalla <[email protected]>:
Hi,
Disclaimer: I am not a Kafka nor Trident user.
I am not aware of any configuration related changes from the PR mentioned.
Sounds more like a setup issue, so maybe first verify that this is setup as
expected.
Gruß
Richard
Am 06.03.2025 um 10:47 schrieb Dabalos, Rahmat Peter via user
<[email protected]>:
Hi Richard,
yup, indeed our kafka client is not updated. We tried to update it, now it has
the following error:
Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any
topics or assigned any partitions
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:608)
~[stormjar.jar:dev-3.0.8.9.0]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:596)
~[stormjar.jar:dev-3.0.8.9.0]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
~[stormjar.jar:dev-3.0.8.9.0]
at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBatchNew(KafkaTridentSpoutEmitter.java:178)
~[stormjar.jar:dev-3.0.8.9.0]
at
org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.emitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45)
~[stormjar.jar:dev-3.0.8.9.0]
at
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166)
~[storm-client-2.8.0.jar:2.8.0]
at
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:77)
~[storm-client-2.8.0.jar:2.8.0]
at
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:235)
~[storm-client-2.8.0.jar:2.8.0]
at
org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
~[storm-client-2.8.0.jar:2.8.0]
at
org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:298)
~[storm-client-2.8.0.jar:2.8.0]
at org.apache.storm.executor.Executor.accept(Executor.java:292)
~[storm-client-2.8.0.jar:2.8.0]
at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113)
~[storm-client-2.8.0.jar:2.8.0]
at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89)
~[storm-client-2.8.0.jar:2.8.0]
at
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154)
~[storm-client-2.8.0.jar:2.8.0]
at
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140)
~[storm-client-2.8.0.jar:2.8.0]
at org.apache.storm.utils.Utils$1.run(Utils.java:398)
~[storm-client-2.8.0.jar:2.8.0]
is there a new way of configuring the brokers and the topics?
Also for the change related to the trident spouts, should we also change how we
initialize it?
<image001.png>
Best regards,
Rahm
From: Richard Zowalla <[email protected]<mailto:[email protected]>>
Date: Wednesday, March 5, 2025 at 4:09 AM
To: Dabalos, Rahmat Peter via user
<[email protected]<mailto:[email protected]>>
Subject: Re: Upgrading to 2.8.0
Hi,
There was a Kafka Client update in version 2.7.1 (if I recall
correctly):
https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Farchive.apache.org%2Fdist%2Fstorm%2Fapache-storm-2.7.1%2FRELEASE_NOTES.html&data=05%7C02%7Crahmat.peter.dabalos%40ing.com%7Cb471916dcc1a490e945708dd5b586930%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638767157471557884%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=YbNhIMgbpZNpswo4lW37aZ5XKMN4I4DGjYjTVxuQJGg%3D&reserved=0<https://archive.apache.org/dist/storm/apache-storm-2.7.1/RELEASE_NOTES.html>
Additionally, there was a change to the KafkaTrident Spouts here:
https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fpull%2F3679&data=05%7C02%7Crahmat.peter.dabalos%40ing.com%7Cb471916dcc1a490e945708dd5b586930%7C587b6ea13db94fe1a9d785d4c64ce5cc%7C0%7C0%7C638767157471577149%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=LNi6K7Mpp1SxT3HxMQGtah4ACvBbLDZFfTDPNNH9yL8%3D&reserved=0<https://github.com/apache/storm/pull/3679>
It might be a good idea to first verify if you're using the correct
versions, as a NoSuchMethodError could indicate you're running an older
version of the Kafka client library in your shade.
Gruß
r
Am Dienstag, dem 04.03.2025 um 16:35 +0000 schrieb Dabalos, Rahmat
Peter:
> Hello,
>
> Good day, as we are now updating our Storm topologies, we are now
> experiencing an error upon startup. The version we are coming from is
> 2.6.4, and we want to upgrade it to the latest 2.8.0. Do you have a
> guide regarding this migration? And if there are changes that we need
> to do in our code before migration?
>
> For your references here are the errors we encountered:
> 1. java.lang.NoSuchMethodError:
> 'org.apache.kafka.clients.consumer.ConsumerRecords
> org.apache.kafka.clients.consumer.Consumer.poll(java.time.Duration)'\
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitBat
> chNew(KafkaTridentSpoutEmitter.java:178) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.e
> mitBatchNew(KafkaTridentOpaqueSpoutEmitter.java:45)
> ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$
> Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:166)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSp
> outExecutor.java:77) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.trident.topology.TridentBoltExecutor.execute(Trident
> BoltExecutor.java:235) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecuto
> r.java:212) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.Executor.acceptTupleAction(Executor.java:29
> 8) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.Executor.accept(Executor.java:292) ~[storm-
> client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-
> client-2.8.0.jar:2.8.0]\
> at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:
> 154) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:
> 140) ~[storm-client-2.8.0.jar:2.8.0]\
> at org.apache.storm.utils.Utils$1.run(Utils.java:398) [storm-
> client-2.8.0.jar:2.8.0]\
> at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\
> 2. 2025-03-04 09:13:19.527 o.a.k.c.n.SslTransportLayer ShutdownHook-
> shutdownFunc [WARN] Failed to send SSL Close message\
> java.io.IOException: Unexpected status returned by SSLEngine.wrap,
> expected CLOSED, received OK. Will not send close message to peer.\
> at
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportL
> ayer.java:158) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.common.utils.Utils.closeAll(Utils.java:663)
> ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:
> 59) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.common.network.Selector.doClose(Selector.java:584)
> ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.common.network.Selector.close(Selector.java:575)
> ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.common.network.Selector.close(Selector.java:541)
> ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.common.network.Selector.close(Selector.java:250)
> ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:506)
> ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clo
> se(ConsumerNetworkClient.java:439) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:71
> ) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j
> ava:1614) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j
> ava:1574) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.j
> ava:1550) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.close(K
> afkaTridentSpoutEmitter.java:377) ~[stormjar.jar:dev-3.0.8.8]\
> at
> org.apache.storm.kafka.spout.trident.KafkaTridentOpaqueSpoutEmitter.c
> lose(KafkaTridentOpaqueSpoutEmitter.java:67) ~[stormjar.jar:dev-
> 3.0.8.8]\
> at
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$
> Emitter.close(OpaquePartitionedTridentSpoutExecutor.java:218)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.trident.spout.TridentSpoutExecutor.cleanup(TridentSp
> outExecutor.java:84) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.trident.topology.TridentBoltExecutor.cleanup(Trident
> BoltExecutor.java:253) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.ExecutorShutdown.shutdown(ExecutorShutdown.
> java:120) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.daemon.worker.Worker.shutdown(Worker.java:518)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.utils.Utils.lambda$addShutdownHookWithDelayedForceKi
> ll$1(Utils.java:357) ~[storm-client-2.8.0.jar:2.8.0]\
> at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\
> \
> \'97\
> 2025-03-04 09:13:20.376 o.a.s.u.Utils Thread-20-b-0-message-spout-
> deserializer-validator-storm-event-extractor-executor[7, 7] [ERROR]
> Async loop died!\
> java.lang.IllegalStateException: Timer is not active\
> at
> org.apache.storm.StormTimer.checkActive(StormTimer.java:159) ~[storm-
> client-2.8.0.jar:2.8.0]\
> at org.apache.storm.StormTimer.scheduleMs(StormTimer.java:85)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at org.apache.storm.StormTimer.schedule(StormTimer.java:65)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at org.apache.storm.StormTimer.schedule(StormTimer.java:69)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.StormTimer.scheduleRecurring(StormTimer.java:107)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.Executor.setupTicks(Executor.java:517)
> ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.bolt.BoltExecutor.init(BoltExecutor.java:13
> 2) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:13
> 8) ~[storm-client-2.8.0.jar:2.8.0]\
> at
> org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:54
> ) ~[storm-client-2.8.0.jar:2.8.0]\
> at org.apache.storm.utils.Utils$1.run(Utils.java:393) [storm-
> client-2.8.0.jar:2.8.0]\
> at java.base/java.lang.Thread.run(Thread.java:840) [?:?]\
>
>
> Best regards,
> Rahm Dabalos
> -----------------------------------------------------------------
> ATTENTION:
> The information in this e-mail is confidential and only meant for the
> intended recipient. If you are not the intended recipient, don't use
> or disclose it in any way. Please let the sender know and delete the
> message immediately.
> -----------------------------------------------------------------
-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended
recipient. If you are not the intended recipient, don't use or disclose it in
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------
-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended
recipient. If you are not the intended recipient, don't use or disclose it in
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------