Another unfortunate thing about ConsumerRebalanceListener is that in
order to do meaningful work in the callback, you need a reference to
the consumer that called it.  But that reference isn't provided to the
callback, which means the listener implementation needs to hold a
reference to the consumer.  Seems like this makes it unnecessarily
awkward to serialize or provide a 0 arg constructor for the listener.

On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger <c...@koeninger.org> wrote:
> I thought about ConsumerRebalanceListener, but seeking to the
> beginning any time there's a rebalance for whatever reason is not
> necessarily the same thing as seeking to the beginning before first
> starting the consumer.
>
> On Wed, Mar 9, 2016 at 2:24 AM, Kamal C <kamaltar...@gmail.com> wrote:
>> Cody,
>>
>> Use ConsumerRebalanceListener to achieve that,
>>
>> ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
>>
>>             @Override
>>             public void onPartitionsRevoked(Collection<TopicPartition>
>> partitions) {
>>             }
>>
>>             @Override
>>             public void onPartitionsAssigned(Collection<TopicPartition>
>> partitions) {
>>                 consumer.seekToBeginning(partitions.toArray(new
>> TopicPartition[0]));
>>             }
>>         };
>>
>> consumer.subscribe(topics, listener);
>>
>> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>>> That suggestion doesn't work, for pretty much the same reason - at the
>>> time poll is first called, there is no reset policy and no committed
>>> offset, so NoOffsetForPartitionException is thrown
>>>
>>> I feel like the underlying problem isn't so much that seekToEnd needs
>>> special case behavior.  It's more that  topic metadata fetches,
>>> consumer position fetches, and message fetches are all lumped together
>>> under a single poll() call, with no way to do them individually if
>>> necessary.
>>>
>>> What does "work" in this situation is to just catch the exception
>>> (which leaves the consumer in a state where topics are assigned) and
>>> then seek.  But that is not exactly an elegant interface.
>>>
>>>     consumer.subscribe(topics)
>>>     try {
>>>       consumer.poll(0)
>>>     } catch {
>>>       case x: Throwable =>
>>>     }
>>>     consumer.seekToBeginning()
>>>     consumer.poll(0)
>>>
>>>
>>>
>>>
>>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>> > Hi Cody,
>>> >
>>> > The problem with that code is in `seekToBeginning()` followed by
>>> > `subscribe(topic)`.
>>> >
>>> > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()`
>>> > is called no partition is assigned yet, and hence it is effectively an
>>> > no-op.
>>> >
>>> > Try
>>> >
>>> >     consumer.subscribe(topics)
>>> >     consumer.poll(0);  // get assigned partitions
>>> >     consumer.seekToBeginning()
>>> >     consumer.poll(0)
>>> >
>>> > to see if that works.
>>> >
>>> > I think it is a valid issue that can be fixed in the new consumer that,
>>> > upon calling seekToEnd/Beginning with no parameter, while no assigned is
>>> > done yet, do the coordination behind the scene; it will though change the
>>> > behavior of the functions as they are no longer always lazily evaluated.
>>> >
>>> >
>>> > Guozhang
>>> >
>>> >
>>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> >
>>> >> Using the 0.9 consumer, I would like to start consuming at the
>>> >> beginning or end, without specifying auto.offset.reset.
>>> >>
>>> >> This does not seem to be possible:
>>> >>
>>> >>     val kafkaParams = Map[String, Object](
>>> >>       "bootstrap.servers" -> conf.getString("kafka.brokers"),
>>> >>       "key.deserializer" -> classOf[StringDeserializer],
>>> >>       "value.deserializer" -> classOf[StringDeserializer],
>>> >>       "group.id" -> "example",
>>> >>       "auto.offset.reset" -> "none"
>>> >>     ).asJava
>>> >>     val topics = conf.getString("kafka.topics").split(",").toList.asJava
>>> >>     val consumer = new KafkaConsumer[String, String](kafkaParams)
>>> >>     consumer.subscribe(topics)
>>> >>     consumer.seekToBeginning()
>>> >>     consumer.poll(0)
>>> >>
>>> >>
>>> >> Results in:
>>> >>
>>> >> Exception in thread "main"
>>> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
>>> >> Undefined offset with no reset policy for partition: testtwo-4
>>> >>         at
>>> >>
>>> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
>>> >>         at
>>> >>
>>> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
>>> >>         at
>>> >>
>>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
>>> >>         at
>>> >>
>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
>>> >>         at
>>> >>
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>> >>         at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
>>> >>
>>> >>
>>> >> I'm assuming this is because, at the time seekToBeginning() is called,
>>> >> subscriptions.assignedPartitions isn't populated.  But polling in
>>> >> order to assign topicpartitions results in an error, which creates a
>>> >> chicken-or-the-egg situation.
>>> >>
>>> >> I don't want to set auto.offset.reset, because I want a hard error if
>>> >> the offsets are out of range at any other time during consumption.
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > -- Guozhang
>>>

Reply via email to