Currently, the only way to restart from the beginning of the queue is by
deleting the previous checkpoint for the group. The reason is that in real
production deployments, a consumer application can go through process
restarts or other interruptions but the expectation is that it can start
reading data from where it left off. Reading from the start is an admin
operation in the current version of Kafka.

We are addressing this in the new consumer APIs by adding enough hooks to
be able to rewind consumption on the fly.

Thanks,
Neha


On Mon, Mar 10, 2014 at 1:17 PM, Adam Phelps <a...@opendns.com> wrote:

> Other than constantly using a new group id (and making a mess of
> zookeeper) or deleting the info for the group from zookeeper, is there
> any way to start from the beginning of the queue?  It looks like this
> can be done from the underlying scala code, but I can't find anything in
> the Java API.
>
> - Adam
>
> On 3/7/14, 5:40 PM, Neha Narkhede wrote:
> > From reading
> > around it looked like setting "autooffset.reset" = "smallest" would do
> > this, however I'm not actually seeing that behavior.
> >
> > The reason is that a consumer actually consults this config only if it
> > doesn't find a previous offset stored for it's group in zookeeper. So, it
> > will respect this config only on startup and not on a subsequent run,
> > unless you delete the group information from zookeeper before starting
> the
> > consumer. That is the reason you see the right behavior with a new
> group.id.
> >
> > Thanks,
> > Neha
> >
> >
> > On Fri, Mar 7, 2014 at 3:54 PM, Adam Phelps <a...@opendns.com> wrote:
> >
> >> I've been trying to write a test consumer in Java for a new use of our
> >> Kafka cluster (currently used solely with Storm), however this use needs
> >> to always start from the earliest offset in the topic.  From reading
> >> around it looked like setting "autooffset.reset" = "smallest" would do
> >> this, however I'm not actually seeing that behavior.  So far the only
> >> way I've managed to force it to start from the beginning is using a new
> >> groupid with each run, which does not seem like an optimal method.
> >>
> >> Am I don't this incorrectly, or is there some other means of doing this
> >> from the Java API?
> >>
> >>     public KafkaUpdateSource(String zkQuorum, String topic, String
> group) {
> >>         Properties props = new Properties();
> >>         props.put("zk.connect", zkQuorum);
> >>         props.put("zk.connectiontimeout.ms", "100000");
> >>
> >>         if (group != null) {
> >>             props.put("groupid", group);
> >>         } else {
> >>             props.put("groupid", "test_group");
> >>         }
> >>         props.put("zk.synctime.ms", "200");
> >>         props.put("autocommit.interval.ms", "1000");
> >>         props.put("consumer.timeout.ms", "1000");
> >>
> >>         // XXX: For some reason the "start from smallest offset" option
> >> doesn't
> >>         // seem to work
> >>         props.put("autooffset.reset", "smallest");
> >>
> >>         ConsumerConnector consumer =
> >> Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
> >>         Map<String, Integer> topicCountMap = new HashMap<String,
> >> Integer>();
> >>         topicCountMap.put(topic, new Integer(1));
> >>         iterator =
> >>
> consumer.createMessageStreams(topicCountMap).get(topic).get(0).iterator();
> >>     }
> >>
> >> - Adam
> >>
> >
>
>

Reply via email to