Another unfortunate thing about ConsumerRebalanceListener is that in order to do meaningful work in the callback, you need a reference to the consumer that called it. But that reference isn't provided to the callback, which means the listener implementation needs to hold a reference to the consumer. Seems like this makes it unnecessarily awkward to serialize or provide a 0 arg constructor for the listener.
On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <c...@koeninger.org> wrote: > I thought about ConsumerRebalanceListener, but seeking to the > beginning any time there's a rebalance for whatever reason is not > necessarily the same thing as seeking to the beginning before first > starting the consumer. > > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> wrote: >> Cody, >> >> Use ConsumerRebalanceListener to achieve that, >> >> ConsumerRebalanceListener listener = new ConsumerRebalanceListener() { >> >> @Override >> public void onPartitionsRevoked(Collection<TopicPartition> >> partitions) { >> } >> >> @Override >> public void onPartitionsAssigned(Collection<TopicPartition> >> partitions) { >> consumer.seekToBeginning(partitions.toArray(new >> TopicPartition[0])); >> } >> }; >> >> consumer.subscribe(topics, listener); >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote: >> >>> That suggestion doesn't work, for pretty much the same reason - at the >>> time poll is first called, there is no reset policy and no committed >>> offset, so NoOffsetForPartitionException is thrown >>> >>> I feel like the underlying problem isn't so much that seekToEnd needs >>> special case behavior. It's more that topic metadata fetches, >>> consumer position fetches, and message fetches are all lumped together >>> under a single poll() call, with no way to do them individually if >>> necessary. >>> >>> What does "work" in this situation is to just catch the exception >>> (which leaves the consumer in a state where topics are assigned) and >>> then seek. But that is not exactly an elegant interface. >>> >>> consumer.subscribe(topics) >>> try { >>> consumer.poll(0) >>> } catch { >>> case x: Throwable => >>> } >>> consumer.seekToBeginning() >>> consumer.poll(0) >>> >>> >>> >>> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> wrote: >>> > Hi Cody, >>> > >>> > The problem with that code is in `seekToBeginning()` followed by >>> > `subscribe(topic)`. >>> > >>> > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()` >>> > is called no partition is assigned yet, and hence it is effectively an >>> > no-op. >>> > >>> > Try >>> > >>> > consumer.subscribe(topics) >>> > consumer.poll(0); // get assigned partitions >>> > consumer.seekToBeginning() >>> > consumer.poll(0) >>> > >>> > to see if that works. >>> > >>> > I think it is a valid issue that can be fixed in the new consumer that, >>> > upon calling seekToEnd/Beginning with no parameter, while no assigned is >>> > done yet, do the coordination behind the scene; it will though change the >>> > behavior of the functions as they are no longer always lazily evaluated. >>> > >>> > >>> > Guozhang >>> > >>> > >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> > >>> >> Using the 0.9 consumer, I would like to start consuming at the >>> >> beginning or end, without specifying auto.offset.reset. >>> >> >>> >> This does not seem to be possible: >>> >> >>> >> val kafkaParams = Map[String, Object]( >>> >> "bootstrap.servers" -> conf.getString("kafka.brokers"), >>> >> "key.deserializer" -> classOf[StringDeserializer], >>> >> "value.deserializer" -> classOf[StringDeserializer], >>> >> "group.id" -> "example", >>> >> "auto.offset.reset" -> "none" >>> >> ).asJava >>> >> val topics = conf.getString("kafka.topics").split(",").toList.asJava >>> >> val consumer = new KafkaConsumer[String, String](kafkaParams) >>> >> consumer.subscribe(topics) >>> >> consumer.seekToBeginning() >>> >> consumer.poll(0) >>> >> >>> >> >>> >> Results in: >>> >> >>> >> Exception in thread "main" >>> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: >>> >> Undefined offset with no reset policy for partition: testtwo-4 >>> >> at >>> >> >>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) >>> >> at >>> >> >>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) >>> >> at >>> >> >>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) >>> >> at >>> >> >>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) >>> >> at >>> >> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >>> >> at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) >>> >> >>> >> >>> >> I'm assuming this is because, at the time seekToBeginning() is called, >>> >> subscriptions.assignedPartitions isn't populated. But polling in >>> >> order to assign topicpartitions results in an error, which creates a >>> >> chicken-or-the-egg situation. >>> >> >>> >> I don't want to set auto.offset.reset, because I want a hard error if >>> >> the offsets are out of range at any other time during consumption. >>> >> >>> > >>> > >>> > >>> > -- >>> > -- Guozhang >>>