My team encountered a similar issue yesterday. We are running a topology in Storm which uses storm-kafka-client and hence should store the consumer offsets in Kafka by default. The consumer-group set in the topology is not being listed in the --list command, but shows up when we execute this:
*./kafka-consumer-groups.sh --new-consumer -group <consumer-group-name> --bootstrap-server <kafka-broker-endpoint>:9092 --describe* Also, the result of this command had CONSUMER-ID, HOST and CLIENT-ID columns empty for Kafka 0.10.2.1. We cross-checked that the offsets were being written to Kafka topic by consuming the messages from kafka-offsets topic and parsing them. Eric, can you point to the Kafka bug that you came across which addresses this issue? On Mon, May 7, 2018 at 3:44 PM Eric Hokanson <[email protected]> wrote: > I've had some more time to look into this and think it must be a bug in > Kafka. > > I spun up my topology in a clean Docker environment and the offsets went > to Kafka fine. Going back to my production Kafka I changed the group.id > in my test topology and spun it back up. Oddly, I noticed it was > processing and tracking lag fine in the UI but didn't see offsets in either > Kafka or Zookeeper. In Kafka when I do a --list I don't see the group.id > among all my others but if I ask for it explicitly the Kafka tool crashes > with: > > Error while executing consumer group command Group test-storm-group with > protocol type '' is not a valid consumer group > > Googling around reveled that this might be a bug in Kafka when the > non-standard partitioner is in use in the consumer. So I think the offsets > really are going to Kafka, I just can't see it using the tools. I think > we'll move toward upgrading Kafka to the latest v1.1 in the coming months > and see what happens. Thanks everyone. > > > > > On Sat, May 5, 2018 at 5:56 AM, Stig Rohde Døssing <[email protected]> > wrote: > >> Okay, that's odd. I think it would be helpful to confirm whether it's a >> problem in the Storm code or a problem with your Kafka setups. Could you >> try to start a KafkaConsumer manually with a new group id and commit an >> offset using either commitSync or commitAsync, and verify that the >> committed offset ends up in Kafka and not Zookeeper? Maybe also try doing >> the same with the consumer group you're using for the topology (after >> backing up the previous offset so you can restore). >> >> Alexandre, your configuration looks good to me. >> >> https://gist.github.com/srdo/e2a75d45a227f1a43e96cd38ab7194d3 has a code >> snippet that will set up a consumer to read and commit messages. If you run >> this code (and point to your own Kafka url and topic), you should see the >> "consumer-test" group show up in the output from kafka-consumer-groups. >> Please let me know whether it shows up for you. It would also be helpful if >> you'd try with one of the group ids you're using in the topologies. >> >> 2018-05-04 23:08 GMT+02:00 Alexandre Vermeerbergen < >> [email protected]>: >> >>> Hello Stig, >>> >>> Yes we set consumer group if for our Kafka spout. >>> Here's a 1st example of the way we create our Kafka spouts: >>> >>> KafkaSpoutConfig<String, String> kafkaSpoutConfig = >>> KafkaSpoutConfig.builder(supConfig.kafka_broker_hosts_str, myKafkaTopic) >>> >>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, myConsumerId) >>> >>> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) >>> >>> .setRecordTranslator(new PodSyncNewPodsKafkaRecordTranslator()) >>> .build(); >>> >>> KafkaSpout<String, String> NewPodsFromKafkaSpout = new >>> KafkaSpout<String, String>(kafkaSpoutConfig); >>> >>> and another one: >>> >>> Builder<String, ?> builder = KafkaSpoutConfig >>> .builder(supConfig.kafka_broker_hosts_str, myTopic1, >>> myTopic2) >>> >>> .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >>> StringDeserializer.class) >>> >>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>> StringDeserializer.class) >>> .setRecordTranslator(new >>> RawEventKafkaRecordTranslator(true)); >>> builder.setProp(kafkaConsumerProp) >>> .setTupleTrackingEnforced(true) >>> .setProcessingGuarantee(ProcessingGuarantee.NO_GUARANTEE) >>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, consumerId) >>> .setFirstPollOffsetStrategy(strategy); >>> IRichSpout spout = builder.build(); >>> >>> Are we using the right way to set consumer ID ? >>> >>> Best regards, >>> Alexandre Vermeerbergen >>> >>> >>> 2018-05-04 22:46 GMT+02:00 Stig Rohde Døssing <[email protected]>: >>> >>>> There are a couple other lines that may commit depending on the >>>> processing guarantee, >>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L391 >>>> for AT_MOST_ONCE and >>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L294 >>>> for NO_GUARANTEE, but basically we always use the KafkaConsumer commit* >>>> methods to commit offsets. >>>> >>>> 2018-05-04 22:13 GMT+02:00 Alexandre Vermeerbergen < >>>> [email protected]>: >>>> >>>>> Hello Stig, >>>>> >>>>> I have no checked where Storm Kafka client 1.2.x commits offset, but >>>>> I'm pretty sure they are not committed to Kafka Brokers (hence Eric's >>>>> mention to Zookeeper seems realistic to me), because I have a Kafka lag >>>>> probe (because on Kafa APIs, not on Storm Kafka tooling) which isn't have >>>>> to find the consumers corresponding to the Kafka Spouts of my topologies. >>>>> >>>>> I reported it, but I find this status a pity, we should be able to >>>>> monitoring Kafka lag using standard solutions, aren't we? >>>>> >>>>> And it's puzzling to me that the page you quote ( >>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-java.util.Map-. >>>>> ) says " This commits offsets to Kafka " when my experience (and Eric's >>>>> good not to be alone) tells us that it's not the case. >>>>> >>>>> Could it be that this was an unintentional regression in Storm Kafka >>>>> 1.2.x and that we can hope to have offsets commited in Kafka Brokers ? >>>>> >>>>> Best regards, >>>>> Alexandre Vermeerbergen >>>>> >>>>> >>>>> 2018-05-04 21:51 GMT+02:00 Eric Hokanson <[email protected] >>>>> >: >>>>> >>>>>> This would be the regular KafkaSpout, i.e.: new >>>>>> KafkaSpout<>(KafkaSpoutConfig.builder("k10server:9092", "topic") >>>>>> >>>>>> .setProp(p.getComponentProperties("KafkaConsumer")).setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE).build(); >>>>>> >>>>>> The offsets end up in the standard old pre-v0.10 location: /consumers/ >>>>>> group.id/offsets/ >>>>>> >>>>>> >>>>>> >>>>>> On Fri, May 4, 2018 at 1:41 PM, Stig Rohde Døssing <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Are we talking about the regular KafkaSpout or the Trident spout? >>>>>>> >>>>>>> The regular KafkaSpout uses the KafkaConsumer class under the hood, >>>>>>> and commits offsets via the commitSync method >>>>>>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync-java.util.Map-. >>>>>>> Could you elaborate on what happens in your case, e.g. where in >>>>>>> Zookeeper >>>>>>> are the offsets ending up? >>>>>>> >>>>>>> 2018-05-04 19:25 GMT+02:00 Eric Hokanson < >>>>>>> [email protected]>: >>>>>>> >>>>>>>> We're working on upgrading our Storm cluster from v1.0.X to >>>>>>>> v1.2.1. We're taking advantage of this upgrade by moving to a newer >>>>>>>> Kafka >>>>>>>> v0.10.2 server from our older v0.8 server and using the built-in new >>>>>>>> Storm >>>>>>>> Kafka spout verses a custom Kafka spout we had before. We've got >>>>>>>> everything up and working now on Storm 1.2.1 except for the fact that >>>>>>>> Storm >>>>>>>> insists that the Kafka offsets should be written to Zookeeper instead >>>>>>>> of to >>>>>>>> Kafka like they should be on newer consumers. I've made sure we're >>>>>>>> using >>>>>>>> the latest 1.2.1 storm-kafka-client and I've tried various versions of >>>>>>>> the >>>>>>>> Kafka client including the latest v1.1.0 but I can't spot any place >>>>>>>> where >>>>>>>> this can be specified. What am I missing? >>>>>>>> >>>>>>>> -- >>>>>>>> Eric >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> *Eric Hokanson* >>>>>> Sr Software Engineer | *Return Path* >>>>>> w | 303-999-3270 >>>>>> m | 970-412-2728 >>>>>> [email protected] >>>>>> >>>>>> >>>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f> >>>>>> [image: Powered by Sigstr] >>>>>> <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark> >>>>>> >>>>> >>>>> >>>> >>> >> > > > -- > *Eric Hokanson* > Sr Software Engineer | *Return Path* > w | 303-999-3270 > m | 970-412-2728 > [email protected] > > > <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f> > [image: Powered by Sigstr] > <https://returnpath.sigstr.net/uc/58c1b1f3825be97ab9b7975f/watermark> > -- Thanks, Srishty Agrawal
