Other than constantly using a new group id (and making a mess of
zookeeper) or deleting the info for the group from zookeeper, is there
any way to start from the beginning of the queue?  It looks like this
can be done from the underlying scala code, but I can't find anything in
the Java API.

- Adam

On 3/7/14, 5:40 PM, Neha Narkhede wrote:
> From reading
> around it looked like setting "autooffset.reset" = "smallest" would do
> this, however I'm not actually seeing that behavior.
> 
> The reason is that a consumer actually consults this config only if it
> doesn't find a previous offset stored for it's group in zookeeper. So, it
> will respect this config only on startup and not on a subsequent run,
> unless you delete the group information from zookeeper before starting the
> consumer. That is the reason you see the right behavior with a new group.id.
> 
> Thanks,
> Neha
> 
> 
> On Fri, Mar 7, 2014 at 3:54 PM, Adam Phelps <a...@opendns.com> wrote:
> 
>> I've been trying to write a test consumer in Java for a new use of our
>> Kafka cluster (currently used solely with Storm), however this use needs
>> to always start from the earliest offset in the topic.  From reading
>> around it looked like setting "autooffset.reset" = "smallest" would do
>> this, however I'm not actually seeing that behavior.  So far the only
>> way I've managed to force it to start from the beginning is using a new
>> groupid with each run, which does not seem like an optimal method.
>>
>> Am I don't this incorrectly, or is there some other means of doing this
>> from the Java API?
>>
>>     public KafkaUpdateSource(String zkQuorum, String topic, String group) {
>>         Properties props = new Properties();
>>         props.put("zk.connect", zkQuorum);
>>         props.put("zk.connectiontimeout.ms", "100000");
>>
>>         if (group != null) {
>>             props.put("groupid", group);
>>         } else {
>>             props.put("groupid", "test_group");
>>         }
>>         props.put("zk.synctime.ms", "200");
>>         props.put("autocommit.interval.ms", "1000");
>>         props.put("consumer.timeout.ms", "1000");
>>
>>         // XXX: For some reason the "start from smallest offset" option
>> doesn't
>>         // seem to work
>>         props.put("autooffset.reset", "smallest");
>>
>>         ConsumerConnector consumer =
>> Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
>>         Map<String, Integer> topicCountMap = new HashMap<String,
>> Integer>();
>>         topicCountMap.put(topic, new Integer(1));
>>         iterator =
>> consumer.createMessageStreams(topicCountMap).get(topic).get(0).iterator();
>>     }
>>
>> - Adam
>>
> 

Reply via email to