Re: Best way to commit offset on demand

2016-01-04 Thread tao xiao
Thanks for the detailed explanation. 'technically commit offsets without
joining group'  I assume it means that I can call assign instead of
subscribe on consumer which bypasses joining process.

The reason we put the reset offset outside of the consumer process is that
we can keep the consumer code as generic as possible since the offset reset
process is not needed for all consumer logics.

On Tue, 5 Jan 2016 at 11:18 Jason Gustafson  wrote:

> Ah, that makes sense if you have to wait to join the group. I think you
> could technically commit offsets without joining if you were sure that the
> group was dead (i.e. all consumers had either left the group cleanly or
> their session timeout expired). But if there are still active members, then
> yeah, you have to join the group. Clearly you have to be a little careful
> in this case if an active consumer is still trying to read data (it won't
> necessarily see the fresh offset commits and could even overwrite them),
> but I assume you're handling this.
>
> Creating a new instance each time you want to do this seems viable to me
> (and likely how we'd end up implementing the command line utility anyway).
> The overhead is just a couple TCP connections. It's probably as good (or as
> bad) as any other approach. The join latency seems unavoidable if you can't
> be sure that the group is dead since we do not allow non-group members to
> commit offsets by design. Any tool we write will be up against the same
> restriction. We might be able to think of a way to bypass it, but that
> sounds dangerous.
>
> Out of curiosity, what's the advantage in your use case to setting offsets
> out-of-band? I would probably consider options for moving it into the
> consumer process.
>
> -Jason
>
> On Mon, Jan 4, 2016 at 6:20 PM, tao xiao  wrote:
>
> > Jason,
> >
> > It normally takes a couple of seconds sometimes it takes longer to join a
> > group if the consumer didn't shutdown gracefully previously.
> >
> > My use case is to have a command/tool to call to reset offset for a list
> of
> > partitions and a particular consumer group before the consumer is started
> > or wait until the offset reaches a given number before the consumer can
> be
> > closed. I think https://issues.apache.org/jira/browse/KAFKA-3059 fits my
> > use case. But for now I need to find out a workaround until this feature
> is
> > implemented.
> >
> > For offset reset one way I can think of is to create a consumer with the
> > same group id that I want to reset the offset for. Then commit the offset
> > for the particular partitions and close the consumer. Is this solution
> > viable?
> >
> > On Tue, 5 Jan 2016 at 09:56 Jason Gustafson  wrote:
> >
> > > Hey Tao,
> > >
> > > Interesting that you're seeing a lot of overhead constructing the new
> > > consumer instance each time. Granted it does have to fetch topic
> metadata
> > > and lookup the coordinator, but I wouldn't have expected that to be a
> big
> > > problem. How long is it typically taking?
> > >
> > > -Jason
> > >
> > > On Mon, Jan 4, 2016 at 3:26 AM, Marko Bonaći <
> marko.bon...@sematext.com>
> > > wrote:
> > >
> > > > How are you consuming those topics?
> > > >
> > > > IF: I assume you have a consumer, so why not commit from within that
> > > > consumer, after you process the message (whatever "process" means to
> > > you).
> > > >
> > > > ELSE: couldn't you have a dedicated consumer for offset commit
> requests
> > > > that you don't shut down between requests?
> > > >
> > > > FINALLY: tell us more about your use case.
> > > >
> > > > Marko Bonaći
> > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> Management
> > > > Solr & Elasticsearch Support
> > > > Sematext  | Contact
> > > > 
> > > >
> > > > On Mon, Jan 4, 2016 at 12:18 PM, tao xiao 
> > wrote:
> > > >
> > > > > Hi team,
> > > > >
> > > > > I have a scenario where I want to write new offset for a list of
> > topics
> > > > on
> > > > > demand. The list of topics is unknown until runtime and the
> interval
> > > > > between each commit is undetermined. what would be the best way to
> do
> > > so?
> > > > >
> > > > > One way I can think of is to create a new consumer and call
> > > > > commitSync(offsets) every time I want to commit. But it seems
> taking
> > > too
> > > > > much time to bootstrap the consumer. is there a lighter way to
> > achieve
> > > > > this?
> > > > >
> > > >
> > >
> >
>


Topic Deletion Issues

2016-01-04 Thread Brenden Cobb
I might have sent this recently, but was not able to receive mail from
this list (fixed)
---

Hello-

We have a use case where we're trying to create a topic, delete, then
recreate with the same topic name.

Running into inconsistant results.

Creating the topic:
/opt/kafka/bin/kafka-topics.sh --create --partitions 3
--replication-factor 3 --topic test-01 --zookeeper zoo01:2181,
zoo02:2181, zoo03:2181

Delete:
/opt/kafka/bin/kafka-topics.sh --delete --topic test-01 --zookeeper
zoo01:2181, zoo02:2181, zoo03:2181

Repeat creation.

The results are inconsistant. Executing the above several times can be
successful, then sporadically we get caught in "topic marked for
deletion" and it does not clear.

This appears to be a Zookeeper issue of sorts as the logs will show:
[2015-12-30 22:32:32,946] WARN Conditional update of path
/brokers/topics/test-01/partitions/0/state with data
{"controller_epoch":21,"leader":2,"version":1,"leader_epoch":1,"isr":[2,0,1]}
and expected version 1 failed due to
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode
= NoNode for /brokers/topics/test-01/partitions/0/state
(kafka.utils.ZkUtils$)

In this instance no subdirectories exist beyond /brokers/topics/test-01

I'd like to know if this is a common occurrance and why the Zookeeper
node isn't "fully" created as Kafka deletion seems stuck without the
expected node path.

We are using Kafka 8.2. Purging is also an option (if achievable
programmatically), we just need to make sure there are no messages
left when resuming the producer.

Appreciate any info/guidance.

Thanks,
BC


Re: Best way to commit offset on demand

2016-01-04 Thread Jason Gustafson
Ah, that makes sense if you have to wait to join the group. I think you
could technically commit offsets without joining if you were sure that the
group was dead (i.e. all consumers had either left the group cleanly or
their session timeout expired). But if there are still active members, then
yeah, you have to join the group. Clearly you have to be a little careful
in this case if an active consumer is still trying to read data (it won't
necessarily see the fresh offset commits and could even overwrite them),
but I assume you're handling this.

Creating a new instance each time you want to do this seems viable to me
(and likely how we'd end up implementing the command line utility anyway).
The overhead is just a couple TCP connections. It's probably as good (or as
bad) as any other approach. The join latency seems unavoidable if you can't
be sure that the group is dead since we do not allow non-group members to
commit offsets by design. Any tool we write will be up against the same
restriction. We might be able to think of a way to bypass it, but that
sounds dangerous.

Out of curiosity, what's the advantage in your use case to setting offsets
out-of-band? I would probably consider options for moving it into the
consumer process.

-Jason

On Mon, Jan 4, 2016 at 6:20 PM, tao xiao  wrote:

> Jason,
>
> It normally takes a couple of seconds sometimes it takes longer to join a
> group if the consumer didn't shutdown gracefully previously.
>
> My use case is to have a command/tool to call to reset offset for a list of
> partitions and a particular consumer group before the consumer is started
> or wait until the offset reaches a given number before the consumer can be
> closed. I think https://issues.apache.org/jira/browse/KAFKA-3059 fits my
> use case. But for now I need to find out a workaround until this feature is
> implemented.
>
> For offset reset one way I can think of is to create a consumer with the
> same group id that I want to reset the offset for. Then commit the offset
> for the particular partitions and close the consumer. Is this solution
> viable?
>
> On Tue, 5 Jan 2016 at 09:56 Jason Gustafson  wrote:
>
> > Hey Tao,
> >
> > Interesting that you're seeing a lot of overhead constructing the new
> > consumer instance each time. Granted it does have to fetch topic metadata
> > and lookup the coordinator, but I wouldn't have expected that to be a big
> > problem. How long is it typically taking?
> >
> > -Jason
> >
> > On Mon, Jan 4, 2016 at 3:26 AM, Marko Bonaći 
> > wrote:
> >
> > > How are you consuming those topics?
> > >
> > > IF: I assume you have a consumer, so why not commit from within that
> > > consumer, after you process the message (whatever "process" means to
> > you).
> > >
> > > ELSE: couldn't you have a dedicated consumer for offset commit requests
> > > that you don't shut down between requests?
> > >
> > > FINALLY: tell us more about your use case.
> > >
> > > Marko Bonaći
> > > Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> > > Solr & Elasticsearch Support
> > > Sematext  | Contact
> > > 
> > >
> > > On Mon, Jan 4, 2016 at 12:18 PM, tao xiao 
> wrote:
> > >
> > > > Hi team,
> > > >
> > > > I have a scenario where I want to write new offset for a list of
> topics
> > > on
> > > > demand. The list of topics is unknown until runtime and the interval
> > > > between each commit is undetermined. what would be the best way to do
> > so?
> > > >
> > > > One way I can think of is to create a new consumer and call
> > > > commitSync(offsets) every time I want to commit. But it seems taking
> > too
> > > > much time to bootstrap the consumer. is there a lighter way to
> achieve
> > > > this?
> > > >
> > >
> >
>


Re: 0.8.2 How do I suppress INFO Closing socket connection to

2016-01-04 Thread tao xiao
You can bump the log level to warn for a particular class

log4j.logger.kafka.network.Processor=WARN

On Tue, 5 Jan 2016 at 08:33 Dillian Murphey  wrote:

> Constant spam of this INFO on my log.
>
> [2016-01-05 00:31:15,887] INFO Closing socket connection to /10.9.255.67.
> (kafka.network.Processor)
> [2016-01-05 00:31:15,917] INFO Closing socket connection to /10.9.255.67.
> (kafka.network.Proces
>
> Anyone know what I need to modify (assuming log4j.properties) to disable
> this but still be informed of issues?
>
> Thank you!
>


Re: Best way to commit offset on demand

2016-01-04 Thread tao xiao
Jason,

It normally takes a couple of seconds sometimes it takes longer to join a
group if the consumer didn't shutdown gracefully previously.

My use case is to have a command/tool to call to reset offset for a list of
partitions and a particular consumer group before the consumer is started
or wait until the offset reaches a given number before the consumer can be
closed. I think https://issues.apache.org/jira/browse/KAFKA-3059 fits my
use case. But for now I need to find out a workaround until this feature is
implemented.

For offset reset one way I can think of is to create a consumer with the
same group id that I want to reset the offset for. Then commit the offset
for the particular partitions and close the consumer. Is this solution
viable?

On Tue, 5 Jan 2016 at 09:56 Jason Gustafson  wrote:

> Hey Tao,
>
> Interesting that you're seeing a lot of overhead constructing the new
> consumer instance each time. Granted it does have to fetch topic metadata
> and lookup the coordinator, but I wouldn't have expected that to be a big
> problem. How long is it typically taking?
>
> -Jason
>
> On Mon, Jan 4, 2016 at 3:26 AM, Marko Bonaći 
> wrote:
>
> > How are you consuming those topics?
> >
> > IF: I assume you have a consumer, so why not commit from within that
> > consumer, after you process the message (whatever "process" means to
> you).
> >
> > ELSE: couldn't you have a dedicated consumer for offset commit requests
> > that you don't shut down between requests?
> >
> > FINALLY: tell us more about your use case.
> >
> > Marko Bonaći
> > Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> > Solr & Elasticsearch Support
> > Sematext  | Contact
> > 
> >
> > On Mon, Jan 4, 2016 at 12:18 PM, tao xiao  wrote:
> >
> > > Hi team,
> > >
> > > I have a scenario where I want to write new offset for a list of topics
> > on
> > > demand. The list of topics is unknown until runtime and the interval
> > > between each commit is undetermined. what would be the best way to do
> so?
> > >
> > > One way I can think of is to create a new consumer and call
> > > commitSync(offsets) every time I want to commit. But it seems taking
> too
> > > much time to bootstrap the consumer. is there a lighter way to achieve
> > > this?
> > >
> >
>


Re: Best way to commit offset on demand

2016-01-04 Thread Jason Gustafson
Hey Tao,

Interesting that you're seeing a lot of overhead constructing the new
consumer instance each time. Granted it does have to fetch topic metadata
and lookup the coordinator, but I wouldn't have expected that to be a big
problem. How long is it typically taking?

-Jason

On Mon, Jan 4, 2016 at 3:26 AM, Marko Bonaći 
wrote:

> How are you consuming those topics?
>
> IF: I assume you have a consumer, so why not commit from within that
> consumer, after you process the message (whatever "process" means to you).
>
> ELSE: couldn't you have a dedicated consumer for offset commit requests
> that you don't shut down between requests?
>
> FINALLY: tell us more about your use case.
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext  | Contact
> 
>
> On Mon, Jan 4, 2016 at 12:18 PM, tao xiao  wrote:
>
> > Hi team,
> >
> > I have a scenario where I want to write new offset for a list of topics
> on
> > demand. The list of topics is unknown until runtime and the interval
> > between each commit is undetermined. what would be the best way to do so?
> >
> > One way I can think of is to create a new consumer and call
> > commitSync(offsets) every time I want to commit. But it seems taking too
> > much time to bootstrap the consumer. is there a lighter way to achieve
> > this?
> >
>


Re: Best way to commit offset on demand

2016-01-04 Thread tao xiao
My use case is to reset offset to a certain number for a particular
consumer group before I start the consumer so that I can control where to
start consuming. It is not ideal put the reset offset logic inside the
consumer application code as this is an out-of-band process and it is part
of our operational procedure. I'd like the keep the consumer as generic as
possible. I am looking for a solution where I can reset/fetch offset for a
list of topic partitions and a particular consumer group which I can invoke
at any time as a standalone procedure.

On Mon, 4 Jan 2016 at 19:27 Marko Bonaći  wrote:

> How are you consuming those topics?
>
> IF: I assume you have a consumer, so why not commit from within that
> consumer, after you process the message (whatever "process" means to you).
>
> ELSE: couldn't you have a dedicated consumer for offset commit requests
> that you don't shut down between requests?
>
> FINALLY: tell us more about your use case.
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext  | Contact
> 
>
> On Mon, Jan 4, 2016 at 12:18 PM, tao xiao  wrote:
>
> > Hi team,
> >
> > I have a scenario where I want to write new offset for a list of topics
> on
> > demand. The list of topics is unknown until runtime and the interval
> > between each commit is undetermined. what would be the best way to do so?
> >
> > One way I can think of is to create a new consumer and call
> > commitSync(offsets) every time I want to commit. But it seems taking too
> > much time to bootstrap the consumer. is there a lighter way to achieve
> > this?
> >
>


Re: stable release?

2016-01-04 Thread Gwen Shapira
First, I think the reason 0.8.2.2 is stable and 0.9.0.0 is latest is mostly
due to oversight.
0.9.0.0 is stable. Some of the new APIs are considered unstable, but this
doesn't imply toward a simple upgrade of the brokers.

Regarding issues, you can see what we fixed for 0.9.0.1:
https://github.com/apache/kafka/commits/0.9.0

Mostly minor things and mostly to the new APIs (KafkaConnect and
KafkaConsumer).

Gwen

On Mon, Jan 4, 2016 at 9:48 AM, Jason Rosenberg  wrote:

> All,
>
> I see that 0.8.2.2 is still listed as the 'stable release', while 0.9.0.0
> is the 'latest release', for kafka.
>
> At what point to we expect 0.9.X to become 'stable'?  Will it be 0.9.0.1?
>
> Also, I assume more than a few of us have upgraded to 0.9.0.0 for
> production environments, any reports so far of any issues?
>
> Thanks,
>
> Jason
>


Find current kafka memory usage

2016-01-04 Thread Dillian Murphey
I was running out of heap space for my kafka broker. Is there a way I can
check how much memory kafka is using so I can alert myself if it is
reaching the max heap size?  Default is 1GB.

Thanks


0.8.2 How do I suppress INFO Closing socket connection to

2016-01-04 Thread Dillian Murphey
Constant spam of this INFO on my log.

[2016-01-05 00:31:15,887] INFO Closing socket connection to /10.9.255.67.
(kafka.network.Processor)
[2016-01-05 00:31:15,917] INFO Closing socket connection to /10.9.255.67.
(kafka.network.Proces

Anyone know what I need to modify (assuming log4j.properties) to disable
this but still be informed of issues?

Thank you!


Re: Minimal KakfaConsumer in Scala fails compilation with `could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]`

2016-01-04 Thread Robert Metzger
Hi Peter,

The problem is that you have the DataSet and DataStream package imports.
Remove the import from the DataSet API (import org.apache.flink.api.scala._)
to make the example work.

On Sun, Dec 20, 2015 at 3:20 PM, Peter Vandenabeele 
wrote:

> Hi,
>
> I am trying to write a minimal Kafka consumer in Scala and got
> this far:
>
> ➜  scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala
> package io.allthingsdata.kafkaConsumer
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
> import org.apache.flink.api.common.typeinfo._
> //import org.apache.flink.streaming.api.windowing.time.Time
>
> object KafkaConsumer {
>   def main(args: Array[String]) {
>
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val valueDeserializer = new SimpleStringSchema()
> val props = new java.util.Properties()
>
> // create a Kafka Consumer
> val kafkaConsumer: FlinkKafkaConsumer082[String] =
>   new FlinkKafkaConsumer082(
> "Topic1",
> valueDeserializer,
> props
>   )
>
> // get input data
> val messageStream: DataStream[String] = env.addSource(kafkaConsumer) //
> supply typeInfo ?
>
> // do something with it
> val messages = messageStream.
>   map ( s => "Kafka and Flink say: $s" )
>
> // execute and print result
> messages.print()
>   }
> }
>
> /*  based on this Java example code
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
>
> DataStream messageStream = env
>   .addSource(new FlinkKafkaConsumer082<>(
> parameterTool.getRequired("topic"),
> new SimpleStringSchema(),
> parameterTool.getProperties()));
>
> Once a DataStream is created, you can transform it as you like. For
> example, let us pad every word with a fixed prefix, and print to stdout:
>
> messageStream
>   .rebalance()
>   .map ( s -> “Kafka and Flink says: ” + s)
>   .print();
> */
>
>
> When trying to compile in sbt I get these error messages:
>
> ```
> > compile
> [info] Compiling 1 Scala source to
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error] val messageStream: DataStream[String] =
> env.addSource(kafkaConsumer) // supply typeInfo ?
> [error]  ^
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error]   map ( s => "Kafka and Flink say: $s" )
> [error]   ^
> [error] two errors found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM
> ```
>
> When inspecting DataStreamSource addSource, I read:
>
> /**
>  * Ads a data source with a custom type information thus opening a
>  * {@link DataStream}. Only in very special cases does the user need to
>  * support type information. Otherwise use
>  * {@link
> #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
>  *
>
>
> I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo
> argument, but that does not solve it.
>
> When trying:
>
> `val messageStream: DataStream[String] = env.addSource(kafkaConsumer,
> BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?`
>
> I get:
>
> > compile
> [info] Compiling 1 Scala source to
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
> overloaded method value addSource with alternatives:
> [error]   [T](function:
>
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
> => Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit
> evidence$18:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
> 
> [error]   [T](function:
> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
> evidence$15: scala.reflect.ClassTag[T], implicit evidence$16:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
> [error]  cannot be applied to
> (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String],
> org.apache.flink.api.common.typeinfo.BasicTypeInfo[String])
> [error] val messageStream: DataStream[String] =
> env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply
> ty

stable release?

2016-01-04 Thread Jason Rosenberg
All,

I see that 0.8.2.2 is still listed as the 'stable release', while 0.9.0.0
is the 'latest release', for kafka.

At what point to we expect 0.9.X to become 'stable'?  Will it be 0.9.0.1?

Also, I assume more than a few of us have upgraded to 0.9.0.0 for
production environments, any reports so far of any issues?

Thanks,

Jason


Re: How to reset a consumer-group's offset in kafka 0.9?

2016-01-04 Thread Stevo Slavić
IMO just command/tool to commit offset for given consumer group and topic
should work (maybe partition can be optional parameter, if partition is not
provided commit same offset for all partitions of given topic). Then one
can commit offset -1 for given consumer group and given topic, effectively
making offset invalid (outside of range).

On Mon, Jan 4, 2016 at 6:00 PM, Guozhang Wang  wrote:

> What you really want is to remove the committed offsets in Kafka servers,
> not the consumer group registry metadata (e.g. such as partition assignment
> information, etc), which will not be done immediately after the group is
> removed, but only after the offsets expiration has elapsed.
>
> We can add this feature as an admin command in future versions.
>
> Guozhang
>
>
> On Mon, Jan 4, 2016 at 2:12 AM, Stevo Slavić  wrote:
>
> > Created related ticket https://issues.apache.org/jira/browse/KAFKA-3057
> > Maybe request for extra operations support/tools could be a separate
> > ticket.
> >
> > On Mon, Jan 4, 2016 at 10:48 AM, Han JU  wrote:
> >
> > > Thanks a lot Guozhang!
> > > So currently there's no way to delete a consumer group in with the new
> > > consumer API? Do you plan to add it in the next versions?
> > >
> > >
> > > 2015-12-31 20:09 GMT+01:00 Guozhang Wang :
> > >
> > > > Hello Han,
> > > >
> > > > 1. As Marko mentioned you can use "seek" in the 0.9 Java consumer to
> > > reset
> > > > your consuming offsets. Or if you are stopping the consumer between
> > your
> > > > test runs you can also commit() with offset 0 before you closing your
> > > > consumer at the end of each test.
> > > >
> > > > 2. In the 0.9 Java consumer both the consumer registry information
> and
> > > the
> > > > offsets are stored in Kafka servers instead of on the ZK, you can
> find
> > a
> > > > more detailed description and motivation of this change here:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> > > >
> > > > So the "AdminUtils.deleteConsumerGroupInZK" will not help in removing
> > the
> > > > consumer registry information.
> > > >
> > > > 3. We have deprecated kafka.tools.ConsumerOffsetChecker in 0.9, see
> > > > "deprecations in 0.9.0" here:
> > > >
> > > > http://kafka.apache.org/documentation.html#upgrade_9_breaking
> > > >
> > > > Instead you can try to use bin/kafka-consumer-groups.sh
> > > > (kafka.admin.ConsumerGroupCommand).
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Wed, Dec 30, 2015 at 9:10 AM, Han JU 
> > wrote:
> > > >
> > > > > Hi Marko,
> > > > >
> > > > > Yes we're currently using this on our production kafka 0.8. But it
> > does
> > > > not
> > > > > seem to work with the new consumer API in 0.9.
> > > > > To answer my own question about deleting consumer group in new
> > consumer
> > > > > API, it seems that it's currently not possible with the new
> consumer
> > > API
> > > > > (there's no delete related method in `AdminClient` of the new
> > consumer
> > > > > API).
> > > > >
> > > > >
> > > > > 2015-12-30 17:02 GMT+01:00 Marko Bonaći  >:
> > > > >
> > > > > > If you want to monitor offset (ZK or Kafka based), try with
> > > QuantFind's
> > > > > > Kafka Offset Monitor.
> > > > > > If you use Docker, it's easy as:
> > > > > >
> > > > > > docker run -p 8080:8080 -e ZK=zk_hostname:2181
> > > > > > jpodeszwik/kafka-offset-monitor
> > > > > > and then opening browser to dockerhost:8080.
> > > > > >
> > > > > > If not in the Docker mood, use instructions here:
> > > > > > https://github.com/quantifind/KafkaOffsetMonitor
> > > > > >
> > > > > > Marko Bonaći
> > > > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> > > Management
> > > > > > Solr & Elasticsearch Support
> > > > > > Sematext  | Contact
> > > > > > 
> > > > > >
> > > > > > On Wed, Dec 30, 2015 at 12:54 PM, Han JU  >
> > > > wrote:
> > > > > >
> > > > > > > Thanks guys. The `seek` seems a solution. But it's more
> > cumbersome
> > > > than
> > > > > > in
> > > > > > > 0.8 because I have to plug in some extra code in my consumer
> > > > > abstractions
> > > > > > > rather than simply deleting a zk node.
> > > > > > > And one more question: where does kafka 0.9 stores the
> > > consumer-group
> > > > > > > information? In fact I also tried to delete the consumer group
> > but
> > > > the
> > > > > > > `AdminUtils.deleteConsumerGroupInZK` does not seem to work in
> > 0.9.
> > > > And
> > > > > > also
> > > > > > > `bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> > > --zookeeper
> > > > > > > localhost:2181 --group group-name` seems broken.
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > > 2015-12-29 16:46 GMT+01:00 Marko Bonaći <
> > marko.bon...@sematext.com
> > > >:
> > > > > > >
> > > > > > > > I was refering to Dana Powers's answer in the link I posted
> (to
> > > > use a
> > > > > > > > client API). You can find an example here:
> > > > > > > >
> > > > > > > >
> > > > >

Re: How to reset a consumer-group's offset in kafka 0.9?

2016-01-04 Thread Gwen Shapira
Created this to follow up: https://issues.apache.org/jira/browse/KAFKA-3059


On Mon, Jan 4, 2016 at 9:21 AM, Stevo Slavić  wrote:

> IMO just command/tool to commit offset for given consumer group and topic
> should work (maybe partition can be optional parameter, if partition is not
> provided commit same offset for all partitions of given topic). Then one
> can commit offset -1 for given consumer group and given topic, effectively
> making offset invalid (outside of range).
>
> On Mon, Jan 4, 2016 at 6:00 PM, Guozhang Wang  wrote:
>
> > What you really want is to remove the committed offsets in Kafka servers,
> > not the consumer group registry metadata (e.g. such as partition
> assignment
> > information, etc), which will not be done immediately after the group is
> > removed, but only after the offsets expiration has elapsed.
> >
> > We can add this feature as an admin command in future versions.
> >
> > Guozhang
> >
> >
> > On Mon, Jan 4, 2016 at 2:12 AM, Stevo Slavić  wrote:
> >
> > > Created related ticket
> https://issues.apache.org/jira/browse/KAFKA-3057
> > > Maybe request for extra operations support/tools could be a separate
> > > ticket.
> > >
> > > On Mon, Jan 4, 2016 at 10:48 AM, Han JU 
> wrote:
> > >
> > > > Thanks a lot Guozhang!
> > > > So currently there's no way to delete a consumer group in with the
> new
> > > > consumer API? Do you plan to add it in the next versions?
> > > >
> > > >
> > > > 2015-12-31 20:09 GMT+01:00 Guozhang Wang :
> > > >
> > > > > Hello Han,
> > > > >
> > > > > 1. As Marko mentioned you can use "seek" in the 0.9 Java consumer
> to
> > > > reset
> > > > > your consuming offsets. Or if you are stopping the consumer between
> > > your
> > > > > test runs you can also commit() with offset 0 before you closing
> your
> > > > > consumer at the end of each test.
> > > > >
> > > > > 2. In the 0.9 Java consumer both the consumer registry information
> > and
> > > > the
> > > > > offsets are stored in Kafka servers instead of on the ZK, you can
> > find
> > > a
> > > > > more detailed description and motivation of this change here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> > > > >
> > > > > So the "AdminUtils.deleteConsumerGroupInZK" will not help in
> removing
> > > the
> > > > > consumer registry information.
> > > > >
> > > > > 3. We have deprecated kafka.tools.ConsumerOffsetChecker in 0.9, see
> > > > > "deprecations in 0.9.0" here:
> > > > >
> > > > > http://kafka.apache.org/documentation.html#upgrade_9_breaking
> > > > >
> > > > > Instead you can try to use bin/kafka-consumer-groups.sh
> > > > > (kafka.admin.ConsumerGroupCommand).
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Dec 30, 2015 at 9:10 AM, Han JU 
> > > wrote:
> > > > >
> > > > > > Hi Marko,
> > > > > >
> > > > > > Yes we're currently using this on our production kafka 0.8. But
> it
> > > does
> > > > > not
> > > > > > seem to work with the new consumer API in 0.9.
> > > > > > To answer my own question about deleting consumer group in new
> > > consumer
> > > > > > API, it seems that it's currently not possible with the new
> > consumer
> > > > API
> > > > > > (there's no delete related method in `AdminClient` of the new
> > > consumer
> > > > > > API).
> > > > > >
> > > > > >
> > > > > > 2015-12-30 17:02 GMT+01:00 Marko Bonaći <
> marko.bon...@sematext.com
> > >:
> > > > > >
> > > > > > > If you want to monitor offset (ZK or Kafka based), try with
> > > > QuantFind's
> > > > > > > Kafka Offset Monitor.
> > > > > > > If you use Docker, it's easy as:
> > > > > > >
> > > > > > > docker run -p 8080:8080 -e ZK=zk_hostname:2181
> > > > > > > jpodeszwik/kafka-offset-monitor
> > > > > > > and then opening browser to dockerhost:8080.
> > > > > > >
> > > > > > > If not in the Docker mood, use instructions here:
> > > > > > > https://github.com/quantifind/KafkaOffsetMonitor
> > > > > > >
> > > > > > > Marko Bonaći
> > > > > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> > > > Management
> > > > > > > Solr & Elasticsearch Support
> > > > > > > Sematext  | Contact
> > > > > > > 
> > > > > > >
> > > > > > > On Wed, Dec 30, 2015 at 12:54 PM, Han JU <
> ju.han.fe...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Thanks guys. The `seek` seems a solution. But it's more
> > > cumbersome
> > > > > than
> > > > > > > in
> > > > > > > > 0.8 because I have to plug in some extra code in my consumer
> > > > > > abstractions
> > > > > > > > rather than simply deleting a zk node.
> > > > > > > > And one more question: where does kafka 0.9 stores the
> > > > consumer-group
> > > > > > > > information? In fact I also tried to delete the consumer
> group
> > > but
> > > > > the
> > > > > > > > `AdminUtils.deleteConsumerGroupInZK` does not seem to work in
> > > 0.9.
> > > > > And
> > > > > > > also
> > > > > > > > `bin/kafka-run-clas

Re: How to reset a consumer-group's offset in kafka 0.9?

2016-01-04 Thread Guozhang Wang
What you really want is to remove the committed offsets in Kafka servers,
not the consumer group registry metadata (e.g. such as partition assignment
information, etc), which will not be done immediately after the group is
removed, but only after the offsets expiration has elapsed.

We can add this feature as an admin command in future versions.

Guozhang


On Mon, Jan 4, 2016 at 2:12 AM, Stevo Slavić  wrote:

> Created related ticket https://issues.apache.org/jira/browse/KAFKA-3057
> Maybe request for extra operations support/tools could be a separate
> ticket.
>
> On Mon, Jan 4, 2016 at 10:48 AM, Han JU  wrote:
>
> > Thanks a lot Guozhang!
> > So currently there's no way to delete a consumer group in with the new
> > consumer API? Do you plan to add it in the next versions?
> >
> >
> > 2015-12-31 20:09 GMT+01:00 Guozhang Wang :
> >
> > > Hello Han,
> > >
> > > 1. As Marko mentioned you can use "seek" in the 0.9 Java consumer to
> > reset
> > > your consuming offsets. Or if you are stopping the consumer between
> your
> > > test runs you can also commit() with offset 0 before you closing your
> > > consumer at the end of each test.
> > >
> > > 2. In the 0.9 Java consumer both the consumer registry information and
> > the
> > > offsets are stored in Kafka servers instead of on the ZK, you can find
> a
> > > more detailed description and motivation of this change here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> > >
> > > So the "AdminUtils.deleteConsumerGroupInZK" will not help in removing
> the
> > > consumer registry information.
> > >
> > > 3. We have deprecated kafka.tools.ConsumerOffsetChecker in 0.9, see
> > > "deprecations in 0.9.0" here:
> > >
> > > http://kafka.apache.org/documentation.html#upgrade_9_breaking
> > >
> > > Instead you can try to use bin/kafka-consumer-groups.sh
> > > (kafka.admin.ConsumerGroupCommand).
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Wed, Dec 30, 2015 at 9:10 AM, Han JU 
> wrote:
> > >
> > > > Hi Marko,
> > > >
> > > > Yes we're currently using this on our production kafka 0.8. But it
> does
> > > not
> > > > seem to work with the new consumer API in 0.9.
> > > > To answer my own question about deleting consumer group in new
> consumer
> > > > API, it seems that it's currently not possible with the new consumer
> > API
> > > > (there's no delete related method in `AdminClient` of the new
> consumer
> > > > API).
> > > >
> > > >
> > > > 2015-12-30 17:02 GMT+01:00 Marko Bonaći :
> > > >
> > > > > If you want to monitor offset (ZK or Kafka based), try with
> > QuantFind's
> > > > > Kafka Offset Monitor.
> > > > > If you use Docker, it's easy as:
> > > > >
> > > > > docker run -p 8080:8080 -e ZK=zk_hostname:2181
> > > > > jpodeszwik/kafka-offset-monitor
> > > > > and then opening browser to dockerhost:8080.
> > > > >
> > > > > If not in the Docker mood, use instructions here:
> > > > > https://github.com/quantifind/KafkaOffsetMonitor
> > > > >
> > > > > Marko Bonaći
> > > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> > Management
> > > > > Solr & Elasticsearch Support
> > > > > Sematext  | Contact
> > > > > 
> > > > >
> > > > > On Wed, Dec 30, 2015 at 12:54 PM, Han JU 
> > > wrote:
> > > > >
> > > > > > Thanks guys. The `seek` seems a solution. But it's more
> cumbersome
> > > than
> > > > > in
> > > > > > 0.8 because I have to plug in some extra code in my consumer
> > > > abstractions
> > > > > > rather than simply deleting a zk node.
> > > > > > And one more question: where does kafka 0.9 stores the
> > consumer-group
> > > > > > information? In fact I also tried to delete the consumer group
> but
> > > the
> > > > > > `AdminUtils.deleteConsumerGroupInZK` does not seem to work in
> 0.9.
> > > And
> > > > > also
> > > > > > `bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> > --zookeeper
> > > > > > localhost:2181 --group group-name` seems broken.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > 2015-12-29 16:46 GMT+01:00 Marko Bonaći <
> marko.bon...@sematext.com
> > >:
> > > > > >
> > > > > > > I was refering to Dana Powers's answer in the link I posted (to
> > > use a
> > > > > > > client API). You can find an example here:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > > >
> > > > > > > Marko Bonaći
> > > > > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> > > > Management
> > > > > > > Solr & Elasticsearch Support
> > > > > > > Sematext  | Contact
> > > > > > > 
> > > > > > >
> > > > > > > On Tue, Dec 29, 2015 at 4:41 PM, Stevo Slavić <
> ssla...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Then I guess @Before test, explicitly commit offset of 0.
> > > > > > > >
> > > > > > > > There doesn't seem to 

Re: Consumer - Failed to find leader

2016-01-04 Thread Ismael Juma
Prabhu, were you able to get this to work in the end?

Ismael


Re: Best way to commit offset on demand

2016-01-04 Thread Marko Bonaći
How are you consuming those topics?

IF: I assume you have a consumer, so why not commit from within that
consumer, after you process the message (whatever "process" means to you).

ELSE: couldn't you have a dedicated consumer for offset commit requests
that you don't shut down between requests?

FINALLY: tell us more about your use case.

Marko Bonaći
Monitoring | Alerting | Anomaly Detection | Centralized Log Management
Solr & Elasticsearch Support
Sematext  | Contact


On Mon, Jan 4, 2016 at 12:18 PM, tao xiao  wrote:

> Hi team,
>
> I have a scenario where I want to write new offset for a list of topics on
> demand. The list of topics is unknown until runtime and the interval
> between each commit is undetermined. what would be the best way to do so?
>
> One way I can think of is to create a new consumer and call
> commitSync(offsets) every time I want to commit. But it seems taking too
> much time to bootstrap the consumer. is there a lighter way to achieve
> this?
>


Best way to commit offset on demand

2016-01-04 Thread tao xiao
Hi team,

I have a scenario where I want to write new offset for a list of topics on
demand. The list of topics is unknown until runtime and the interval
between each commit is undetermined. what would be the best way to do so?

One way I can think of is to create a new consumer and call
commitSync(offsets) every time I want to commit. But it seems taking too
much time to bootstrap the consumer. is there a lighter way to achieve
this?


Re: How to reset a consumer-group's offset in kafka 0.9?

2016-01-04 Thread Stevo Slavić
Created related ticket https://issues.apache.org/jira/browse/KAFKA-3057
Maybe request for extra operations support/tools could be a separate ticket.

On Mon, Jan 4, 2016 at 10:48 AM, Han JU  wrote:

> Thanks a lot Guozhang!
> So currently there's no way to delete a consumer group in with the new
> consumer API? Do you plan to add it in the next versions?
>
>
> 2015-12-31 20:09 GMT+01:00 Guozhang Wang :
>
> > Hello Han,
> >
> > 1. As Marko mentioned you can use "seek" in the 0.9 Java consumer to
> reset
> > your consuming offsets. Or if you are stopping the consumer between your
> > test runs you can also commit() with offset 0 before you closing your
> > consumer at the end of each test.
> >
> > 2. In the 0.9 Java consumer both the consumer registry information and
> the
> > offsets are stored in Kafka servers instead of on the ZK, you can find a
> > more detailed description and motivation of this change here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
> >
> > So the "AdminUtils.deleteConsumerGroupInZK" will not help in removing the
> > consumer registry information.
> >
> > 3. We have deprecated kafka.tools.ConsumerOffsetChecker in 0.9, see
> > "deprecations in 0.9.0" here:
> >
> > http://kafka.apache.org/documentation.html#upgrade_9_breaking
> >
> > Instead you can try to use bin/kafka-consumer-groups.sh
> > (kafka.admin.ConsumerGroupCommand).
> >
> > Guozhang
> >
> >
> >
> > On Wed, Dec 30, 2015 at 9:10 AM, Han JU  wrote:
> >
> > > Hi Marko,
> > >
> > > Yes we're currently using this on our production kafka 0.8. But it does
> > not
> > > seem to work with the new consumer API in 0.9.
> > > To answer my own question about deleting consumer group in new consumer
> > > API, it seems that it's currently not possible with the new consumer
> API
> > > (there's no delete related method in `AdminClient` of the new consumer
> > > API).
> > >
> > >
> > > 2015-12-30 17:02 GMT+01:00 Marko Bonaći :
> > >
> > > > If you want to monitor offset (ZK or Kafka based), try with
> QuantFind's
> > > > Kafka Offset Monitor.
> > > > If you use Docker, it's easy as:
> > > >
> > > > docker run -p 8080:8080 -e ZK=zk_hostname:2181
> > > > jpodeszwik/kafka-offset-monitor
> > > > and then opening browser to dockerhost:8080.
> > > >
> > > > If not in the Docker mood, use instructions here:
> > > > https://github.com/quantifind/KafkaOffsetMonitor
> > > >
> > > > Marko Bonaći
> > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> Management
> > > > Solr & Elasticsearch Support
> > > > Sematext  | Contact
> > > > 
> > > >
> > > > On Wed, Dec 30, 2015 at 12:54 PM, Han JU 
> > wrote:
> > > >
> > > > > Thanks guys. The `seek` seems a solution. But it's more cumbersome
> > than
> > > > in
> > > > > 0.8 because I have to plug in some extra code in my consumer
> > > abstractions
> > > > > rather than simply deleting a zk node.
> > > > > And one more question: where does kafka 0.9 stores the
> consumer-group
> > > > > information? In fact I also tried to delete the consumer group but
> > the
> > > > > `AdminUtils.deleteConsumerGroupInZK` does not seem to work in 0.9.
> > And
> > > > also
> > > > > `bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> --zookeeper
> > > > > localhost:2181 --group group-name` seems broken.
> > > > >
> > > > > Thanks!
> > > > >
> > > > > 2015-12-29 16:46 GMT+01:00 Marko Bonaći  >:
> > > > >
> > > > > > I was refering to Dana Powers's answer in the link I posted (to
> > use a
> > > > > > client API). You can find an example here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > > >
> > > > > > Marko Bonaći
> > > > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> > > Management
> > > > > > Solr & Elasticsearch Support
> > > > > > Sematext  | Contact
> > > > > > 
> > > > > >
> > > > > > On Tue, Dec 29, 2015 at 4:41 PM, Stevo Slavić  >
> > > > wrote:
> > > > > >
> > > > > > > Then I guess @Before test, explicitly commit offset of 0.
> > > > > > >
> > > > > > > There doesn't seem to be a tool for committing offset, only for
> > > > > > > checking/fetching current offset (see
> > > > > > > http://kafka.apache.org/documentation.html#operations )
> > > > > > >
> > > > > > > On Tue, Dec 29, 2015 at 4:35 PM, Han JU <
> ju.han.fe...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Stevo,
> > > > > > > >
> > > > > > > > But by deleting and recreating the topic, do I remove also
> the
> > > > > messages
> > > > > > > > ingested?
> > > > > > > > My use case is that I ingest prepared messages once and run
> > > > consumer
> > > > > > > tests
> > > > > > > > multiple times, between each test run I reset the consumer
> > > group's
> > > > > > offset
> > > > > > > > so that each run starts from the beginning and consu

Re: How to reset a consumer-group's offset in kafka 0.9?

2016-01-04 Thread Han JU
Thanks a lot Guozhang!
So currently there's no way to delete a consumer group in with the new
consumer API? Do you plan to add it in the next versions?


2015-12-31 20:09 GMT+01:00 Guozhang Wang :

> Hello Han,
>
> 1. As Marko mentioned you can use "seek" in the 0.9 Java consumer to reset
> your consuming offsets. Or if you are stopping the consumer between your
> test runs you can also commit() with offset 0 before you closing your
> consumer at the end of each test.
>
> 2. In the 0.9 Java consumer both the consumer registry information and the
> offsets are stored in Kafka servers instead of on the ZK, you can find a
> more detailed description and motivation of this change here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
>
> So the "AdminUtils.deleteConsumerGroupInZK" will not help in removing the
> consumer registry information.
>
> 3. We have deprecated kafka.tools.ConsumerOffsetChecker in 0.9, see
> "deprecations in 0.9.0" here:
>
> http://kafka.apache.org/documentation.html#upgrade_9_breaking
>
> Instead you can try to use bin/kafka-consumer-groups.sh
> (kafka.admin.ConsumerGroupCommand).
>
> Guozhang
>
>
>
> On Wed, Dec 30, 2015 at 9:10 AM, Han JU  wrote:
>
> > Hi Marko,
> >
> > Yes we're currently using this on our production kafka 0.8. But it does
> not
> > seem to work with the new consumer API in 0.9.
> > To answer my own question about deleting consumer group in new consumer
> > API, it seems that it's currently not possible with the new consumer API
> > (there's no delete related method in `AdminClient` of the new consumer
> > API).
> >
> >
> > 2015-12-30 17:02 GMT+01:00 Marko Bonaći :
> >
> > > If you want to monitor offset (ZK or Kafka based), try with QuantFind's
> > > Kafka Offset Monitor.
> > > If you use Docker, it's easy as:
> > >
> > > docker run -p 8080:8080 -e ZK=zk_hostname:2181
> > > jpodeszwik/kafka-offset-monitor
> > > and then opening browser to dockerhost:8080.
> > >
> > > If not in the Docker mood, use instructions here:
> > > https://github.com/quantifind/KafkaOffsetMonitor
> > >
> > > Marko Bonaći
> > > Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> > > Solr & Elasticsearch Support
> > > Sematext  | Contact
> > > 
> > >
> > > On Wed, Dec 30, 2015 at 12:54 PM, Han JU 
> wrote:
> > >
> > > > Thanks guys. The `seek` seems a solution. But it's more cumbersome
> than
> > > in
> > > > 0.8 because I have to plug in some extra code in my consumer
> > abstractions
> > > > rather than simply deleting a zk node.
> > > > And one more question: where does kafka 0.9 stores the consumer-group
> > > > information? In fact I also tried to delete the consumer group but
> the
> > > > `AdminUtils.deleteConsumerGroupInZK` does not seem to work in 0.9.
> And
> > > also
> > > > `bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper
> > > > localhost:2181 --group group-name` seems broken.
> > > >
> > > > Thanks!
> > > >
> > > > 2015-12-29 16:46 GMT+01:00 Marko Bonaći :
> > > >
> > > > > I was refering to Dana Powers's answer in the link I posted (to
> use a
> > > > > client API). You can find an example here:
> > > > >
> > > > >
> > > >
> > >
> >
> http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > >
> > > > > Marko Bonaći
> > > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> > Management
> > > > > Solr & Elasticsearch Support
> > > > > Sematext  | Contact
> > > > > 
> > > > >
> > > > > On Tue, Dec 29, 2015 at 4:41 PM, Stevo Slavić 
> > > wrote:
> > > > >
> > > > > > Then I guess @Before test, explicitly commit offset of 0.
> > > > > >
> > > > > > There doesn't seem to be a tool for committing offset, only for
> > > > > > checking/fetching current offset (see
> > > > > > http://kafka.apache.org/documentation.html#operations )
> > > > > >
> > > > > > On Tue, Dec 29, 2015 at 4:35 PM, Han JU 
> > > > wrote:
> > > > > >
> > > > > > > Hi Stevo,
> > > > > > >
> > > > > > > But by deleting and recreating the topic, do I remove also the
> > > > messages
> > > > > > > ingested?
> > > > > > > My use case is that I ingest prepared messages once and run
> > > consumer
> > > > > > tests
> > > > > > > multiple times, between each test run I reset the consumer
> > group's
> > > > > offset
> > > > > > > so that each run starts from the beginning and consumers all
> the
> > > > > > messages.
> > > > > > >
> > > > > > > 2015-12-29 16:19 GMT+01:00 Stevo Slavić :
> > > > > > >
> > > > > > > > Have you considered deleting and recreating topic used in
> test?
> > > > > > > > Once topic is clean, read/poll once - any committed offset
> > should
> > > > be
> > > > > > > > outside of the range, and consumer should reset offset.
> > > > > > > >
> > > > > > > > On Tue, Dec 29, 2015 at 4:11 PM, Han JU <
> > ju.han.fe...@gmail.com>
> > > > > > wrote:
> > > > > > >