Re: reading the consumer offsets topic

2016-05-16 Thread Cliff Rhyne
Hi Tao,

Sorry for the delay.  Thanks for pointing out that property.  That was the
fix.

On Mon, May 9, 2016 at 6:00 PM, tao xiao  wrote:

> You need to enable internal topic in the consumer.properties
>
> exclude.internal.topics=false
>
> On Mon, 9 May 2016 at 12:42 Cliff Rhyne  wrote:
>
> > Thanks Todd and Tao.  I've tried those tricks but no luck.
> >
> > Just to add more info, this is the __consumer_offsets specific config
> that
> > is shown by the topic describe command:
> >
> >
> >
> Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=uncompressed
> >
> > On Mon, May 9, 2016 at 1:16 PM, tao xiao  wrote:
> >
> > > You can try this
> > >
> > > bin/kafka-console-consumer.sh --consumer.config
> > > config/consumer.properties --from-beginning
> > > --topic __consumer_offsets --zookeeper localhost:2181 —formatter
> > > "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
> > >
> > > On Mon, 9 May 2016 at 09:40 Todd Palino  wrote:
> > >
> > > > The GroupMetadataManager one should be working for you with 0.9. I
> > don’t
> > > > have a 0.9 KCC set up at the moment, so I’m using an 0.8 version
> where
> > > it’s
> > > > different (it’s the other class for me). The only thing I can offer
> now
> > > is
> > > > did you put quotes around the arg to --formatter so you don’t get
> weird
> > > > shell interference?
> > > >
> > > > -Todd
> > > >
> > > >
> > > > On Mon, May 9, 2016 at 8:18 AM, Cliff Rhyne 
> wrote:
> > > >
> > > > > Thanks, Todd.  It's still not working unfortunately.
> > > > >
> > > > > This results in nothing getting printed to the console and requires
> > > kill
> > > > -9
> > > > > in another window to stop (ctrl-c doesn't work):
> > > > >
> > > > > /kafka-console-consumer.sh --bootstrap-server localhost:9092
> > > --zookeeper
> > > > >  --topic __consumer_offsets --formatter
> > > > > kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter
> > > > >
> > > > > This results in a stack trace because it can't find the class:
> > > > >
> > > > > ./kafka-console-consumer.sh --bootstrap-server localhost:9092
> > > --zookeeper
> > > > >  --topic __consumer_offsets --formatter
> > > > > kafka.server.OffsetManager\$OffsetsMessageFormatter
> > > > >
> > > > > Exception in thread "main" java.lang.ClassNotFoundException:
> > > > > kafka.server.OffsetManager$OffsetsMessageFormatter
> > > > >
> > > > >
> > > > > I'm on 0.9.0.1. "broker-list" is invalid and zookeeper is required
> > > > > regardless of the bootstrap-server parameter.
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Cliff
> > > > >
> > > > > On Sun, May 8, 2016 at 7:35 PM, Todd Palino 
> > wrote:
> > > > >
> > > > > > It looks like you’re just missing the proper message formatter.
> Of
> > > > > course,
> > > > > > that largely depends on your version of the broker. Try:
> > > > > >
> > > > > > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic
> > > > > > __consumer_offsets
> > > > > > --formatter
> > > > > kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter
> > > > > >
> > > > > >
> > > > > > If for some reason that doesn’t work, you can try
> > > > > > "kafka.server.OffsetManager\$OffsetsMessageFormatter” instead.
> > > > > >
> > > > > > -Todd
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sun, May 8, 2016 at 1:26 PM, Cliff Rhyne 
> > > wrote:
> > > > > >
> > > > > > > I'm having difficulty reading the consumer offsets topic from
> the
> > > > > command
> > > > > > > line.  I try the following but it doesn't seem to work (along
> > with
> > > a
> > > > > few
> > > > > > >

Re: reading the consumer offsets topic

2016-05-09 Thread Cliff Rhyne
Thanks Todd and Tao.  I've tried those tricks but no luck.

Just to add more info, this is the __consumer_offsets specific config that
is shown by the topic describe command:

Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=uncompressed

On Mon, May 9, 2016 at 1:16 PM, tao xiao  wrote:

> You can try this
>
> bin/kafka-console-consumer.sh --consumer.config
> config/consumer.properties --from-beginning
> --topic __consumer_offsets --zookeeper localhost:2181 —formatter
> "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
>
> On Mon, 9 May 2016 at 09:40 Todd Palino  wrote:
>
> > The GroupMetadataManager one should be working for you with 0.9. I don’t
> > have a 0.9 KCC set up at the moment, so I’m using an 0.8 version where
> it’s
> > different (it’s the other class for me). The only thing I can offer now
> is
> > did you put quotes around the arg to --formatter so you don’t get weird
> > shell interference?
> >
> > -Todd
> >
> >
> > On Mon, May 9, 2016 at 8:18 AM, Cliff Rhyne  wrote:
> >
> > > Thanks, Todd.  It's still not working unfortunately.
> > >
> > > This results in nothing getting printed to the console and requires
> kill
> > -9
> > > in another window to stop (ctrl-c doesn't work):
> > >
> > > /kafka-console-consumer.sh --bootstrap-server localhost:9092
> --zookeeper
> > >  --topic __consumer_offsets --formatter
> > > kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter
> > >
> > > This results in a stack trace because it can't find the class:
> > >
> > > ./kafka-console-consumer.sh --bootstrap-server localhost:9092
> --zookeeper
> > >  --topic __consumer_offsets --formatter
> > > kafka.server.OffsetManager\$OffsetsMessageFormatter
> > >
> > > Exception in thread "main" java.lang.ClassNotFoundException:
> > > kafka.server.OffsetManager$OffsetsMessageFormatter
> > >
> > >
> > > I'm on 0.9.0.1. "broker-list" is invalid and zookeeper is required
> > > regardless of the bootstrap-server parameter.
> > >
> > >
> > > Thanks,
> > >
> > > Cliff
> > >
> > > On Sun, May 8, 2016 at 7:35 PM, Todd Palino  wrote:
> > >
> > > > It looks like you’re just missing the proper message formatter. Of
> > > course,
> > > > that largely depends on your version of the broker. Try:
> > > >
> > > > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic
> > > > __consumer_offsets
> > > > --formatter
> > > kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter
> > > >
> > > >
> > > > If for some reason that doesn’t work, you can try
> > > > "kafka.server.OffsetManager\$OffsetsMessageFormatter” instead.
> > > >
> > > > -Todd
> > > >
> > > >
> > > >
> > > >
> > > > On Sun, May 8, 2016 at 1:26 PM, Cliff Rhyne 
> wrote:
> > > >
> > > > > I'm having difficulty reading the consumer offsets topic from the
> > > command
> > > > > line.  I try the following but it doesn't seem to work (along with
> a
> > > few
> > > > > related variants including specifying the zookeeper hosts):
> > > > >
> > > > > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic
> > > > > __consumer_offsets
> > > > >
> > > > > Is there a special way to read the consumer offsets topic?
> > > > >
> > > > > Thanks,
> > > > > Cliff
> > > > >
> > > > > --
> > > > > Cliff Rhyne
> > > > > Software Engineering Manager
> > > > > e: crh...@signal.co
> > > > > signal.co
> > > > > 
> > > > >
> > > > > Cut Through the Noise
> > > > >
> > > > > This e-mail and any files transmitted with it are for the sole use
> of
> > > the
> > > > > intended recipient(s) and may contain confidential and privileged
> > > > > information. Any unauthorized use of this email is strictly
> > prohibited.
> > > > > ©2016 Signal. All rights reserved.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *—-*
> > > > *Todd Palino*
> > > > Staff Site Reliability Engineer
> > > > Data Infrastructure Streaming
> > > >
> > > >
> > > >
> > > > linkedin.com/in/toddpalino
> > > >
> > >
> > >
> > >
> > > --
> > > Cliff Rhyne
> > > Software Engineering Manager
> > > e: crh...@signal.co
> > > signal.co
> > > 
> > >
> > > Cut Through the Noise
> > >
> > > This e-mail and any files transmitted with it are for the sole use of
> the
> > > intended recipient(s) and may contain confidential and privileged
> > > information. Any unauthorized use of this email is strictly prohibited.
> > > ©2016 Signal. All rights reserved.
> > >
> >
> >
> >
> > --
> > *—-*
> > *Todd Palino*
> > Staff Site Reliability Engineer
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
> >
>



-- 
Cliff Rhyne
Software Engineering Manager
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2016 Signal. All rights reserved.


Re: reading the consumer offsets topic

2016-05-09 Thread Cliff Rhyne
Thanks, Todd.  It's still not working unfortunately.

This results in nothing getting printed to the console and requires kill -9
in another window to stop (ctrl-c doesn't work):

/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper
 --topic __consumer_offsets --formatter
kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter

This results in a stack trace because it can't find the class:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper
 --topic __consumer_offsets --formatter
kafka.server.OffsetManager\$OffsetsMessageFormatter

Exception in thread "main" java.lang.ClassNotFoundException:
kafka.server.OffsetManager$OffsetsMessageFormatter


I'm on 0.9.0.1. "broker-list" is invalid and zookeeper is required
regardless of the bootstrap-server parameter.


Thanks,

Cliff

On Sun, May 8, 2016 at 7:35 PM, Todd Palino  wrote:

> It looks like you’re just missing the proper message formatter. Of course,
> that largely depends on your version of the broker. Try:
>
> ./kafka-console-consumer.sh --broker-list localhost:9092 --topic
> __consumer_offsets
> --formatter kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter
>
>
> If for some reason that doesn’t work, you can try
> "kafka.server.OffsetManager\$OffsetsMessageFormatter” instead.
>
> -Todd
>
>
>
>
> On Sun, May 8, 2016 at 1:26 PM, Cliff Rhyne  wrote:
>
> > I'm having difficulty reading the consumer offsets topic from the command
> > line.  I try the following but it doesn't seem to work (along with a few
> > related variants including specifying the zookeeper hosts):
> >
> > ./kafka-console-consumer.sh --broker-list localhost:9092 --topic
> > __consumer_offsets
> >
> > Is there a special way to read the consumer offsets topic?
> >
> > Thanks,
> > Cliff
> >
> > --
> > Cliff Rhyne
> > Software Engineering Manager
> > e: crh...@signal.co
> > signal.co
> > 
> >
> > Cut Through the Noise
> >
> > This e-mail and any files transmitted with it are for the sole use of the
> > intended recipient(s) and may contain confidential and privileged
> > information. Any unauthorized use of this email is strictly prohibited.
> > ©2016 Signal. All rights reserved.
> >
>
>
>
> --
> *—-*
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>



-- 
Cliff Rhyne
Software Engineering Manager
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2016 Signal. All rights reserved.


reading the consumer offsets topic

2016-05-08 Thread Cliff Rhyne
I'm having difficulty reading the consumer offsets topic from the command
line.  I try the following but it doesn't seem to work (along with a few
related variants including specifying the zookeeper hosts):

./kafka-console-consumer.sh --broker-list localhost:9092 --topic
__consumer_offsets

Is there a special way to read the consumer offsets topic?

Thanks,
Cliff

-- 
Cliff Rhyne
Software Engineering Manager
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2016 Signal. All rights reserved.


Re: list of challenges encountered using 0.9.0.1

2016-05-06 Thread Cliff Rhyne
Thanks for the updates, Jason.  Let me know if you have questions about use
case that brings up any of these scenarios.

On Thu, May 5, 2016 at 7:34 PM, Jason Gustafson  wrote:

> Hey Cliff,
>
> Thanks for the feedback. A few select comments:
>
> 1. The new Java KafkaConsumer doesn’t have a method to return the high
> >watermark (last offset in the topic/partition's log.
>
>
> This is currently exposed in fetch responses, so we could add it to the
> ConsumerRecords object. In general we've so far avoided exposing offset
> APIs only because we haven't had time to think them through. My feeling is
> that the rest of the API is becoming stable, so this will likely become a
> major focus in the next release.
>
>2. Can’t connect using the Java client to just check status on topics
> >(committed offset for different consumer groups, high watermark, etc)
>
>
> This is definitely a gap. I think the idea in KIP-4 (which I'm really
> hoping will be completed in the next release) is to expose an AdminClient
> in kafka-clients which contains this kind of access.
>
>  3. kafka-consumer-groups.sh requires a member of the consumer group to
> >be connected and consuming or offset values won't be displayed
> > (artificial
> >prerequisite)
>
>
> Yes! I have felt this annoyance as well. I've been working on a patch for
> this problem, but I'm not sure if it can get into 0.10. The problem is
> basically that there is an inconsistency between how long we retain offsets
> and group metadata (such as members and assignments). Because of this, it's
> difficult to tell whether the state of the group has actually been removed.
>
>  6. Consumer group rebalances affect all consumers across all topics
> >within the consumer group including topics without a new subscriber.
>
>
> We've discussed a few options for partial rebalancing, but it's tough to
> come up with a proposal which doesn't add a lot of complication to the
> group management protocol. I'd love to see a solution for this as well, but
> I think we need a simple approach to get broad support. There is an active
> KIP for sticky partition assignment which might help somewhat with this
> problem. The basic idea would be to optimistically continue processing
> while a rebalance is taking place under the expectation that most of the
> partitions would continue to be owned by the consumer after the rebalance
> completes. We need to work through the usage though to see if this makes
> sense.
>
> 9. Heartbeat only on poll() causes problems when we have gaps in
> >consuming before committing (such as when we publish files and don’t
> > want
> >to commit until the publish is complete).  Supposedly position() will
> >perform a heartbeat too in addition to poll() (I haven’t verified this
> > but
> >heard it at the Kafka Summit), but it does add extra complexity to the
> >application.
>
>
> I think in general we've underestimated the number of use cases where it's
> difficult to put a bound on processing time. Although max.poll.records
> solves part of the problem (by making processing time more predictable),
> it's still difficult generally to figure out what this bound is. It's
> particularly a big problem for frameworks (such as Streams and Connect)
> where we don't directly control the processing time. I consider it very
> likely in the next iteration that we will either 1) add a background thread
> to the consumer for asynchronous heartbeating or 2) expose an API to make
> it easy for users to do the same thing.
>
>
> Thanks,
> Jason
>
> On Wed, May 4, 2016 at 1:43 PM, Cliff Rhyne  wrote:
>
> > While at the Kafka Summit I was asked to write up a list of challenges
> and
> > confusions my team encountered using Kafka.  We are using 0.9.0.1 and use
> > the new Java KakfaConsumer.
> >
> >
> >1. The new Java KafkaConsumer doesn’t have a method to return the high
> >watermark (last offset in the topic/partition's log.
> >2. Can’t connect using the Java client to just check status on topics
> >(committed offset for different consumer groups, high watermark, etc)
> >3. kafka-consumer-groups.sh requires a member of the consumer group to
> >be connected and consuming or offset values won't be displayed
> > (artificial
> >prerequisite)
> >4. Default config for tracking committed offsets is poor (commits
> should
> >be very permanent shouldn’t age out after 24 hours).
> >5. It should not be possible to set an offset.retention time <
> >log

list of challenges encountered using 0.9.0.1

2016-05-04 Thread Cliff Rhyne
While at the Kafka Summit I was asked to write up a list of challenges and
confusions my team encountered using Kafka.  We are using 0.9.0.1 and use
the new Java KakfaConsumer.


   1. The new Java KafkaConsumer doesn’t have a method to return the high
   watermark (last offset in the topic/partition's log.
   2. Can’t connect using the Java client to just check status on topics
   (committed offset for different consumer groups, high watermark, etc)
   3. kafka-consumer-groups.sh requires a member of the consumer group to
   be connected and consuming or offset values won't be displayed (artificial
   prerequisite)
   4. Default config for tracking committed offsets is poor (commits should
   be very permanent shouldn’t age out after 24 hours).
   5. It should not be possible to set an offset.retention time <
   log.retention time.
   6. Consumer group rebalances affect all consumers across all topics
   within the consumer group including topics without a new subscriber.
   7. Changing the broker config requires a 1-at-a-time roll of all the
   cluster, a service kafka reload would be nice.
   8. Console consumer still uses “old” consumer style configuration
   options (--zookeeper). This is a bit strange for anyone who has started
   using Kafka with version 0.9 or later, since the cli options don’t
   correspond to what you expect the consumer to need.
   9. Heartbeat only on poll() causes problems when we have gaps in
   consuming before committing (such as when we publish files and don’t want
   to commit until the publish is complete).  Supposedly position() will
   perform a heartbeat too in addition to poll() (I haven’t verified this but
   heard it at the Kafka Summit), but it does add extra complexity to the
   application.


Thanks for listening,
Cliff Rhyne

-- 
Cliff Rhyne
Software Engineering Manager
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2016 Signal. All rights reserved.


Re: Kafka cluster performance degradation (Kafka 0.8.2.1)

2016-02-03 Thread Cliff Rhyne
Hi Leo,

We also have a fairly idle CPU for our brokers, but we aren't tracking
latency like you are.  We are mostly interested in overall throughput and
storage.  We are using compression and bundling which add to latency but
decrease storage usage.

Good luck,
Cliff

On Wed, Feb 3, 2016 at 10:55 AM, Clelio De Souza  wrote:

> Hi Cliff,
>
> Thanks for getting back to me on this. Sorry for the delay on replying to
> you, but I was away on holiday.
>
> I have been monitoring the Kafka brokers via JMX and on all nodes CPU usage
> was pretty much idle, under 1% as you said. I should have mentioned before
> that the Kafka cluster was not under heavy load at all. I have reserved a
> TEST Kafka cluster to benchmark the latency in our application. Since my
> last measurements, the latency on this cluster that I have setup on
> 18/01/2016 has increased a bit to ~ 25ms to 33ms (measurement taken today
> 03/02/2016), which indeed indicates the latency will incrementally
> downgrade over time.
>
> We still haven't found out what may be causing the degradation of
> performance (i.e. latency). We are not using compression at all for our
> messages, but we decided to perform the latency tests against Kafka
> 0.9.0.0. I have bounce the whole cluster and started with Kafka 0.9.0.0 and
> initial latency tests have shown the latency has been kept the same around
> 25ms to 33ms. It will be interesting to monitor whether the latency will
> start degrading over time (in 1 week's time for instance). Hopefully, with
> Kafka 0.9.0.0 that won't happen.
>
> If you have other ideas what may be causing that, please, let me know. I
> appreciate it.
>
> Cheers,
> Leo
>
> On 21 January 2016 at 16:16, Cliff Rhyne  wrote:
>
> > Hi Leo,
> >
> > I'm not sure if this is the issue you're encountering, but this is what
> we
> > found when we went from 0.8.1.1 to 0.8.2.1.
> >
> > Snappy compression didn't work as expected.  Something in the library
> broke
> > compressing bundles of messages and each message was compressed
> > individually (which for us caused a lot of overhead).  Disk usage went
> way
> > up and CPU usage went incrementally up (still under 1%).  I didn't
> monitor
> > latency, it was well within the tolerances of our system.  We resolved
> this
> > issue by switching our compression to gzip.
> >
> > This issue is supposedly fixed in 0.9.0.0 but we haven't verified it yet.
> >
> > Cliff
> >
> > On Thu, Jan 21, 2016 at 4:04 AM, Clelio De Souza 
> > wrote:
> >
> > > Hi all,
> > >
> > >
> > > We are using Kafka in production and we have been facing some
> performance
> > > degradation of the cluster, apparently after the cluster is a bit
> "old".
> > >
> > >
> > > We have our production cluster which is up and running since 31/12/2015
> > and
> > > performance tests on our application measuring a full round trip of TCP
> > > packets and Kafka producing/consumption of data (3 hops in total for
> > every
> > > single TCP packet being sent, persisted and consumed in the other end).
> > The
> > > results for the production cluster shows a latency of ~ 130ms to 200ms.
> > >
> > >
> > > In our Test environment we have the very same software and
> specification
> > in
> > > AWS instances, i.e. Test environment as being a mirror of Prod. The
> Kafka
> > > cluster has been running in Test since 18/12/2015 and the same
> > performance
> > > tests (as described above) shows a increase of latency to ~ 800ms to
> > > 1000ms.
> > >
> > >
> > > We have just recently setup a new fresh Kafka cluster (on 18/01/2016)
> > > trying to get to the bottom of this performance degradation problem and
> > in
> > > the new Kafka cluster deployed in Test in replacement of the original
> > Test
> > > Kafka cluster we found a very small latency of ~ 10ms to 15ms.
> > >
> > >
> > > We are using Kafka 0.8.2.1 version for all those environment mentioned
> > > above. And the same cluster configuration has been setup on all of
> them,
> > as
> > > 3 brokers as m3.xlarge AWS instances. The amount of data and Kafka
> topics
> > > are roughly the same among those environments, therefore the
> performance
> > > degradation seems to be not directly related to the amount of data in
> the
> > > cluster. We suspect that something running inside of the Kafka cluster,
> > > such as repartitioning or log rentention (even though our 

Re: kafka-consumer-groups.sh doesn't work when consumers are off

2016-01-29 Thread Cliff Rhyne
Thanks for the added info.  For the mean time we'll rely on the older
ConsumerOffsetChecker for our monitoring tools.

Thanks,
Cliff

On Fri, Jan 29, 2016 at 10:56 AM, Guozhang Wang  wrote:

> Tao,
>
> You are right, ConsumerOffsetChecker can still get offsets from the offset
> manager in Kafka.
>
> Guozhang
>
> On Thu, Jan 28, 2016 at 9:36 PM, tao xiao  wrote:
>
> > it first issues an offsetrequest to broker and check if offset is stored
> in
> > broker if not it will queries zk
> >
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala#L171
> >
> > On Fri, 29 Jan 2016 at 13:11 Guozhang Wang  wrote:
> >
> > > Tao,
> > >
> > > Hmm that is a bit wired since ConsumerOffsetChecker itself does not
> talk
> > to
> > > brokers at all, but only through ZK.
> > >
> > > Guozhang
> > >
> > > On Thu, Jan 28, 2016 at 6:07 PM, tao xiao 
> wrote:
> > >
> > > > Guozhang,
> > > >
> > > > The old ConsumerOffsetChecker works for new consumer too with offset
> > > stored
> > > > in Kafka. I tested it with mirror maker with new consumer enabled. it
> > is
> > > > able to show offset during mirror maker running and after its
> shutdown.
> > > >
> > > > On Fri, 29 Jan 2016 at 06:34 Guozhang Wang 
> wrote:
> > > >
> > > > > Once the offset is written to the log it is persistent and hence
> > should
> > > > > survive broker failures. And its retention policy is configurable.
> > > > >
> > > > > It may be a bit misleading in saying "in-memory cache" in my
> previous
> > > > > email: the brokers just keep the in-memory map of [group,
> partition]
> > ->
> > > > > latest_offset, while the offset commit history is kept in the log.
> > When
> > > > we
> > > > > delete the group, we remove the corresponding entry from memory map
> > and
> > > > put
> > > > > a tombstone into log as well so that the old offsets will be
> > compacted
> > > > > eventually according to compaction policy.
> > > > >
> > > > > The old ConsumerOffsetChecker only works for old consumer that
> stores
> > > > > offset in ZK.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Thu, Jan 28, 2016 at 1:43 PM, Cliff Rhyne 
> > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > That looks like it might help but feels like there might be some
> > > gaps.
> > > > > > Would it be able to survive restarts of the kafka broker?  How
> long
> > > > would
> > > > > > it stay in the cache (and is that configurable)?  If it expires
> > from
> > > > the
> > > > > > cache, what's the cache-miss operation look like?  (yes, a lot of
> > > this
> > > > > > depends on the data still being in the logs to recover)
> > > > > >
> > > > > > In the mean time, can I rely on the deprecated
> > ConsumerOffsetChecker
> > > > > (which
> > > > > > looks at zookeeper) even though I'm using the new KafkaConsumer?
> > > > > >
> > > > > > Thanks,
> > > > > > Cliff
> > > > > >
> > > > > > On Thu, Jan 28, 2016 at 3:30 PM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Cliff,
> > > > > > >
> > > > > > > Short answer to your question is it is just the current
> > > > implementation.
> > > > > > >
> > > > > > > The coordinator stores the offsets as messages in an internal
> > topic
> > > > and
> > > > > > > also keeps the latest offset values in in-memory. It answers
> > > > > > > ConsumerGroupRequest using its cached offset, and upon the
> > consumer
> > > > > group
> > > > > > > being removed since no member is alive already, it removed it
> > from
> > > > its
> > > > > > > in-memory cache and add a "tombstone" to the offset log as
> well.
> > > But
> > > > > the
> > > > >

Re: kafka-consumer-groups.sh doesn't work when consumers are off

2016-01-28 Thread Cliff Rhyne
Hi Guozhang,

That looks like it might help but feels like there might be some gaps.
Would it be able to survive restarts of the kafka broker?  How long would
it stay in the cache (and is that configurable)?  If it expires from the
cache, what's the cache-miss operation look like?  (yes, a lot of this
depends on the data still being in the logs to recover)

In the mean time, can I rely on the deprecated ConsumerOffsetChecker (which
looks at zookeeper) even though I'm using the new KafkaConsumer?

Thanks,
Cliff

On Thu, Jan 28, 2016 at 3:30 PM, Guozhang Wang  wrote:

> Hi Cliff,
>
> Short answer to your question is it is just the current implementation.
>
> The coordinator stores the offsets as messages in an internal topic and
> also keeps the latest offset values in in-memory. It answers
> ConsumerGroupRequest using its cached offset, and upon the consumer group
> being removed since no member is alive already, it removed it from its
> in-memory cache and add a "tombstone" to the offset log as well. But the
> offsets are still persistent as messages in the log, which will only be
> compacted after a while (this is depend on the log compaction policy).
>
> There is a ticket open for improving on this scenario (
> https://issues.apache.org/jira/browse/KAFKA-2720) which lets the
> coordinator to only "purge" dead groups periodically instead of
> immediately, and that may partially resolve your case.
>
> Guozhang
>
>
> On Thu, Jan 28, 2016 at 12:13 PM, Cliff Rhyne  wrote:
>
> > Just following up on this concern.  Is there a constraint that prevents
> > ConsumerGroupCommand from reporting offsets on a group if no members are
> > connected, or is this just the current implementation?
> >
> > Thanks,
> > Cliff
> >
> > On Mon, Jan 25, 2016 at 3:50 PM, Cliff Rhyne  wrote:
> >
> > > I'm running into a few challenges trying to evaluate offsets and lag
> > > (pending message count) in the new Java KafkaConsumer.  The old
> > > ConsumerOffsetChecker doesn't work anymore since the offsets aren't
> > stored
> > > in zookeeper after switching from the old consumer.  This would be
> fine,
> > > but the kafka-consumer-groups.sh command doesn't work if the consumers
> > are
> > > shut off.  This seems like an unnecessary limitation and is problematic
> > for
> > > troubleshooting / monitoring when the application is turned off (or
> while
> > > my application is running due to our stopping/starting consumers).
> > >
> > > Is there a constraint that I'm not aware of or is this something that
> > > could be changed?
> > >
> > > Thanks,
> > > Cliff
> > >
> > > --
> > > Cliff Rhyne
> > > Software Engineering Lead
> > > e: crh...@signal.co
> > > signal.co
> > > 
> > >
> > > Cut Through the Noise
> > >
> > > This e-mail and any files transmitted with it are for the sole use of
> the
> > > intended recipient(s) and may contain confidential and privileged
> > > information. Any unauthorized use of this email is strictly prohibited.
> > > ©2015 Signal. All rights reserved.
> > >
> >
> >
> >
> > --
> > Cliff Rhyne
> > Software Engineering Lead
> > e: crh...@signal.co
> > signal.co
> > 
> >
> > Cut Through the Noise
> >
> > This e-mail and any files transmitted with it are for the sole use of the
> > intended recipient(s) and may contain confidential and privileged
> > information. Any unauthorized use of this email is strictly prohibited.
> > ©2015 Signal. All rights reserved.
> >
>
>
>
> --
> -- Guozhang
>



-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: kafka-consumer-groups.sh doesn't work when consumers are off

2016-01-28 Thread Cliff Rhyne
Just following up on this concern.  Is there a constraint that prevents
ConsumerGroupCommand from reporting offsets on a group if no members are
connected, or is this just the current implementation?

Thanks,
Cliff

On Mon, Jan 25, 2016 at 3:50 PM, Cliff Rhyne  wrote:

> I'm running into a few challenges trying to evaluate offsets and lag
> (pending message count) in the new Java KafkaConsumer.  The old
> ConsumerOffsetChecker doesn't work anymore since the offsets aren't stored
> in zookeeper after switching from the old consumer.  This would be fine,
> but the kafka-consumer-groups.sh command doesn't work if the consumers are
> shut off.  This seems like an unnecessary limitation and is problematic for
> troubleshooting / monitoring when the application is turned off (or while
> my application is running due to our stopping/starting consumers).
>
> Is there a constraint that I'm not aware of or is this something that
> could be changed?
>
> Thanks,
> Cliff
>
> --
> Cliff Rhyne
> Software Engineering Lead
> e: crh...@signal.co
> signal.co
> 
>
> Cut Through the Noise
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized use of this email is strictly prohibited.
> ©2015 Signal. All rights reserved.
>



-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


[jira] [Updated] (KAFKA-3161) Refactor Java client's use of the Properties class

2016-01-27 Thread Cliff Rhyne (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cliff Rhyne updated KAFKA-3161:
---
Description: 
The KafkaConsumer takes a Properties class for the config, but then instead of 
using it's getProperty() function the class gets copied (which breaks the use 
of defaults).

One example is this from ConsumerConfig:

Properties newProperties = new Properties();
newProperties.putAll(properties);

Which could be re-written as:

Properties newProperties = new Properties(properties);

This is important because applications using the client library expect to be 
able to specify the default properties above.

(I'm not sure how to go about this, but I'm working on the change locally right 
now.  I'd like to assign it to myself but I guess I can't because i'm not on 
the contributor list).

  was:
The KafkaConsumer takes a Properties class for the config, but then instead of 
using it's getProperty() function the class gets copied (which breaks the use 
of defaults).

One example is this from ConsumerConfig:

Properties newProperties = new Properties();
newProperties.putAll(properties);

Which could be re-written as:

Properties newProperties = new Properties(properties);

This is important because applications using the client library expect to be 
able to specify the default properties above.

(I'm not sure how to go about this, but I'm working on the change locally right 
now).


> Refactor Java client's use of the Properties class
> --
>
> Key: KAFKA-3161
> URL: https://issues.apache.org/jira/browse/KAFKA-3161
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.9.0.0
>Reporter: Cliff Rhyne
>
> The KafkaConsumer takes a Properties class for the config, but then instead 
> of using it's getProperty() function the class gets copied (which breaks the 
> use of defaults).
> One example is this from ConsumerConfig:
> Properties newProperties = new Properties();
> newProperties.putAll(properties);
> Which could be re-written as:
> Properties newProperties = new Properties(properties);
> This is important because applications using the client library expect to be 
> able to specify the default properties above.
> (I'm not sure how to go about this, but I'm working on the change locally 
> right now.  I'd like to assign it to myself but I guess I can't because i'm 
> not on the contributor list).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3161) Refactor Java client's use of the Properties class

2016-01-27 Thread Cliff Rhyne (JIRA)
Cliff Rhyne created KAFKA-3161:
--

 Summary: Refactor Java client's use of the Properties class
 Key: KAFKA-3161
 URL: https://issues.apache.org/jira/browse/KAFKA-3161
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.9.0.0
Reporter: Cliff Rhyne


The KafkaConsumer takes a Properties class for the config, but then instead of 
using it's getProperty() function the class gets copied (which breaks the use 
of defaults).

One example is this from ConsumerConfig:

Properties newProperties = new Properties();
newProperties.putAll(properties);

Which could be re-written as:

Properties newProperties = new Properties(properties);

This is important because applications using the client library expect to be 
able to specify the default properties above.

(I'm not sure how to go about this, but I'm working on the change locally right 
now).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


kafka-consumer-groups.sh doesn't work when consumers are off

2016-01-25 Thread Cliff Rhyne
I'm running into a few challenges trying to evaluate offsets and lag
(pending message count) in the new Java KafkaConsumer.  The old
ConsumerOffsetChecker doesn't work anymore since the offsets aren't stored
in zookeeper after switching from the old consumer.  This would be fine,
but the kafka-consumer-groups.sh command doesn't work if the consumers are
shut off.  This seems like an unnecessary limitation and is problematic for
troubleshooting / monitoring when the application is turned off (or while
my application is running due to our stopping/starting consumers).

Is there a constraint that I'm not aware of or is this something that could
be changed?

Thanks,
Cliff

-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: Kafka cluster performance degradation (Kafka 0.8.2.1)

2016-01-21 Thread Cliff Rhyne
r
> socket.send.buffer.bytes=102400
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=102400
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
>
> # Log Basics #
>
> # A comma seperated list of directories under which to store log files
> log.dirs=/data/kafka/logs
>
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files
> across
> # the brokers.
> num.partitions=8
>
> # The number of threads per data directory to be used for log recovery at
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data
> dirs located in RAID array.
> num.recovery.threads.per.data.dir=1
>
> # Log Flush Policy
> #
>
> # Messages are immediately written to the filesystem but by default we only
> fsync() to sync
> # the OS cache lazily. The following configurations control the flush of
> data to disk.
> # There are a few important trade-offs here:
> #1. Durability: Unflushed data may be lost if you are not using
> replication.
> #2. Latency: Very large flush intervals may lead to latency spikes when
> the flush does occur as there will be a lot of data to flush.
> #3. Throughput: The flush is generally the most expensive operation,
> and a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data
> after a period of time or
> # every N messages (or both). This can be done globally and overridden on a
> per-topic basis.
>
> # The number of messages to accept before forcing a flush of data to disk
> #log.flush.interval.messages=1
>
> # The maximum amount of time a message can sit in a log before we force a
> flush
> #log.flush.interval.ms=1000
>
> # Log Retention Policy
> #
>
> # The following configurations control the disposal of log segments. The
> policy can
> # be set to delete segments after a period of time, or after a given size
> has accumulated.
> # A segment will be deleted whenever *either* of these criteria are met.
> Deletion always happens
> # from the end of the log.
>
> # The minimum age of a log file to be eligible for deletion
> # Failsafe is we don't lose any messages for 20+ years, topics should
> # be configured individually
> log.retention.hours=20
>
> # A size-based retention policy for logs. Segments are pruned from the log
> as long as the remaining
> # segments don't drop below log.retention.bytes.
> #log.retention.bytes=1073741824
>
> # The maximum size of a log segment file. When this size is reached a new
> log segment will be created.
> log.segment.bytes=1073741824
>
> # The interval at which log segments are checked to see if they can be
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=30
>
> # By default the log cleaner is disabled and the log retention policy will
> default to just delete segments after their retention expires.
> # If log.cleaner.enable=true is set the cleaner will be enabled and
> individual logs can then be marked for log compaction.
> log.cleaner.enable=false
>
> default.replication.factor=3
>
> auto.create.topics.enable=true
>
> controlled.shutdown.enable=true
>
> delete.topic.enable=true
>
> # Zookeeper #
>
> # Zookeeper connection string (see zookeeper docs for details).
> # This is a comma separated host:port pairs, each corresponding to a zk
> # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
> # You can also append an optional chroot string to the urls to specify the
> # root directory for all kafka znodes.
> zookeeper.connect=:2181,:2181,:2181
>
> # Timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=6000
>



-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: Connecting with new consumers and existing group appears to cause existing group to rebalance

2016-01-07 Thread Cliff Rhyne
Using separate groups would be only a slight inconvenience but is entirely
doable.  It sounds like that's the preferred model.

Thanks for the explanation.

On Thu, Jan 7, 2016 at 4:08 PM, Jason Gustafson  wrote:

> Thanks for the explanation. Unfortunately, the consumer doesn't work that
> way. Rebalances affect all members of the group regardless of
> subscriptions. Partial rebalances could be an interesting idea to consider
> for the future, but we haven't had any cases where a group had differing
> subscriptions in a steady state. Usually users just use separate groups.
> Would that not make sense in this case?
>
> -Jason
>
> On Thu, Jan 7, 2016 at 1:54 PM, Cliff Rhyne  wrote:
>
> > I'll explain.
> >
> > Say there are two topics, foo and bar.  Foo has two partitions (foo-0 and
> > foo-1).  Bar has one partition (bar-0).  The application uses one
> consumer
> > group for all KafkaConsumers called "app".  The application has two
> > consumers always consuming from the two foo partitions, but
> > creates/destroys a consumer for bar as needed.  Connecting to bar to
> > consume shouldn't cause a rebalance of foo, but because all three
> consumers
> > use "app" as their group, it does.
> >
> > One of my assumptions is that topics and rebalancing should operate
> > independently from each other regardless of the group.
> >
> > Thanks,
> > Cliff
> >
> > On Thu, Jan 7, 2016 at 3:44 PM, Jason Gustafson 
> > wrote:
> >
> > > >
> > > > If a new consumer joins an existing group, it triggers a rebalance
> even
> > > > if it's
> > > > consuming from a new topic (which matches my symptoms).
> > > >
> > >
> > > Not sure I understand the issue. Rebalances are triggered when either
> 1)
> > > group membership changes, 2) a consumer's subscription changes, or 3)
> > when
> > > the number of partitions for a topic changes.
> > >
> > > -Jason
> > >
> > > On Thu, Jan 7, 2016 at 1:38 PM, Cliff Rhyne  wrote:
> > >
> > > > I'll give the 0.9.0 trunk a try.
> > > >
> > > > By the way, it looks to me that this might be a separate issue.  I
> just
> > > > setup unique group IDs for the stop/start topics from the always-on
> > topic
> > > > and my test passed.  I think the issue is that the
> > > > GroupCoordinator.doJoinGroup() only tracks the group and not the
> topic
> > > when
> > > > deciding whether to rebalance (GroupCoordinator.prepareRebalance()).
> > If
> > > a
> > > > new consumer joins an existing group, it triggers a rebalance even if
> > > it's
> > > > consuming from a new topic (which matches my symptoms).
> > > >
> > > > What do you think?
> > > >
> > > > On Thu, Jan 7, 2016 at 3:27 PM, Jason Gustafson 
> > > > wrote:
> > > >
> > > > > There have been bugs affecting both the client and the server. The
> > one
> > > I
> > > > > mentioned above only affected the client, so you could try updating
> > it
> > > > > alone if that's easier, but it would be better to do both.
> > > > >
> > > > > I'll leave it to others to comment on the release timeline. I
> haven't
> > > > seen
> > > > > any major consumer-related bugs in the past couple weeks, so my
> > feeling
> > > > is
> > > > > that it's starting to stabilize. It would be nice to get KIP-41
> into
> > > the
> > > > > next release though.
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Thu, Jan 7, 2016 at 1:18 PM, Cliff Rhyne 
> > wrote:
> > > > >
> > > > > > Hi Jason,
> > > > > >
> > > > > > I'm just on the 0.9.0.0 release.  Are the fixes in the client,
> the
> > > > kafka
> > > > > > service, or both?  I'll give it a try.
> > > > > >
> > > > > > Is there a timeline for when 0.9.0.1 would be released?
> > > > > >
> > > > > > Thanks,
> > > > > > Cliff
> > > > > >
> > > > > > On Thu, Jan 7, 2016 at 3:14 PM, Jason Gustafson <
> > ja...@confluent.io>
> > > > > > wrote:
> > > &g

Re: Connecting with new consumers and existing group appears to cause existing group to rebalance

2016-01-07 Thread Cliff Rhyne
I'll explain.

Say there are two topics, foo and bar.  Foo has two partitions (foo-0 and
foo-1).  Bar has one partition (bar-0).  The application uses one consumer
group for all KafkaConsumers called "app".  The application has two
consumers always consuming from the two foo partitions, but
creates/destroys a consumer for bar as needed.  Connecting to bar to
consume shouldn't cause a rebalance of foo, but because all three consumers
use "app" as their group, it does.

One of my assumptions is that topics and rebalancing should operate
independently from each other regardless of the group.

Thanks,
Cliff

On Thu, Jan 7, 2016 at 3:44 PM, Jason Gustafson  wrote:

> >
> > If a new consumer joins an existing group, it triggers a rebalance even
> > if it's
> > consuming from a new topic (which matches my symptoms).
> >
>
> Not sure I understand the issue. Rebalances are triggered when either 1)
> group membership changes, 2) a consumer's subscription changes, or 3) when
> the number of partitions for a topic changes.
>
> -Jason
>
> On Thu, Jan 7, 2016 at 1:38 PM, Cliff Rhyne  wrote:
>
> > I'll give the 0.9.0 trunk a try.
> >
> > By the way, it looks to me that this might be a separate issue.  I just
> > setup unique group IDs for the stop/start topics from the always-on topic
> > and my test passed.  I think the issue is that the
> > GroupCoordinator.doJoinGroup() only tracks the group and not the topic
> when
> > deciding whether to rebalance (GroupCoordinator.prepareRebalance()).  If
> a
> > new consumer joins an existing group, it triggers a rebalance even if
> it's
> > consuming from a new topic (which matches my symptoms).
> >
> > What do you think?
> >
> > On Thu, Jan 7, 2016 at 3:27 PM, Jason Gustafson 
> > wrote:
> >
> > > There have been bugs affecting both the client and the server. The one
> I
> > > mentioned above only affected the client, so you could try updating it
> > > alone if that's easier, but it would be better to do both.
> > >
> > > I'll leave it to others to comment on the release timeline. I haven't
> > seen
> > > any major consumer-related bugs in the past couple weeks, so my feeling
> > is
> > > that it's starting to stabilize. It would be nice to get KIP-41 into
> the
> > > next release though.
> > >
> > > -Jason
> > >
> > > On Thu, Jan 7, 2016 at 1:18 PM, Cliff Rhyne  wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > I'm just on the 0.9.0.0 release.  Are the fixes in the client, the
> > kafka
> > > > service, or both?  I'll give it a try.
> > > >
> > > > Is there a timeline for when 0.9.0.1 would be released?
> > > >
> > > > Thanks,
> > > > Cliff
> > > >
> > > > On Thu, Jan 7, 2016 at 3:14 PM, Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Hey Cliff,
> > > > >
> > > > > Are you using the 0.9.0.0 release? We've fixed a few problems in
> the
> > > > 0.9.0
> > > > > branch, some of which might explain the behavior you're seeing.
> There
> > > was
> > > > > one bug in particular which resulted in the consumer not fetching
> > data
> > > > for
> > > > > a set of partitions after a rebalance.
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Thu, Jan 7, 2016 at 1:01 PM, Cliff Rhyne 
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I'm testing out some changes with the 0.9.0.0 new KafkaConsumer
> > API.
> > > > We
> > > > > > re-use the same consumer group ID across different components of
> > the
> > > > > > application (which consume from different topics).  One topic is
> > > always
> > > > > > being consumed from, the rest are turned on and off.
> > > > > >
> > > > > > If I only run the always-on consumer, I have a very low
> occurrence
> > > rate
> > > > > of
> > > > > > the log message below:
> > > > > >
> > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> > > > Attempt
> > > > > > to heart beat failed since the group is rebalancing, try to
> re-join
> > > > > group.
> >

Re: Connecting with new consumers and existing group appears to cause existing group to rebalance

2016-01-07 Thread Cliff Rhyne
I'll give the 0.9.0 trunk a try.

By the way, it looks to me that this might be a separate issue.  I just
setup unique group IDs for the stop/start topics from the always-on topic
and my test passed.  I think the issue is that the
GroupCoordinator.doJoinGroup() only tracks the group and not the topic when
deciding whether to rebalance (GroupCoordinator.prepareRebalance()).  If a
new consumer joins an existing group, it triggers a rebalance even if it's
consuming from a new topic (which matches my symptoms).

What do you think?

On Thu, Jan 7, 2016 at 3:27 PM, Jason Gustafson  wrote:

> There have been bugs affecting both the client and the server. The one I
> mentioned above only affected the client, so you could try updating it
> alone if that's easier, but it would be better to do both.
>
> I'll leave it to others to comment on the release timeline. I haven't seen
> any major consumer-related bugs in the past couple weeks, so my feeling is
> that it's starting to stabilize. It would be nice to get KIP-41 into the
> next release though.
>
> -Jason
>
> On Thu, Jan 7, 2016 at 1:18 PM, Cliff Rhyne  wrote:
>
> > Hi Jason,
> >
> > I'm just on the 0.9.0.0 release.  Are the fixes in the client, the kafka
> > service, or both?  I'll give it a try.
> >
> > Is there a timeline for when 0.9.0.1 would be released?
> >
> > Thanks,
> > Cliff
> >
> > On Thu, Jan 7, 2016 at 3:14 PM, Jason Gustafson 
> > wrote:
> >
> > > Hey Cliff,
> > >
> > > Are you using the 0.9.0.0 release? We've fixed a few problems in the
> > 0.9.0
> > > branch, some of which might explain the behavior you're seeing. There
> was
> > > one bug in particular which resulted in the consumer not fetching data
> > for
> > > a set of partitions after a rebalance.
> > >
> > > -Jason
> > >
> > > On Thu, Jan 7, 2016 at 1:01 PM, Cliff Rhyne  wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm testing out some changes with the 0.9.0.0 new KafkaConsumer API.
> > We
> > > > re-use the same consumer group ID across different components of the
> > > > application (which consume from different topics).  One topic is
> always
> > > > being consumed from, the rest are turned on and off.
> > > >
> > > > If I only run the always-on consumer, I have a very low occurrence
> rate
> > > of
> > > > the log message below:
> > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> > Attempt
> > > > to heart beat failed since the group is rebalancing, try to re-join
> > > group.
> > > >
> > > >
> > > > If I run both types of consumers, the log message occurs frequently
> and
> > > the
> > > > alway-on consumer eventually doesn't succeed in rejoining (I see the
> > > > attempt in the logs to rejoin but nothing happens after that).  I
> only
> > > have
> > > > logs on the client side to work with; there's nothing showing up in
> the
> > > > kafka logs to show why the group's state isn't stable.
> > > >
> > > > Thanks,
> > > > Cliff
> > > >
> > > > --
> > > > Cliff Rhyne
> > > > Software Engineering Lead
> > > > e: crh...@signal.co
> > > > signal.co
> > > > 
> > > >
> > > > Cut Through the Noise
> > > >
> > > > This e-mail and any files transmitted with it are for the sole use of
> > the
> > > > intended recipient(s) and may contain confidential and privileged
> > > > information. Any unauthorized use of this email is strictly
> prohibited.
> > > > ©2015 Signal. All rights reserved.
> > > >
> > >
> >
> >
> >
> > --
> > Cliff Rhyne
> > Software Engineering Lead
> > e: crh...@signal.co
> > signal.co
> > 
> >
> > Cut Through the Noise
> >
> > This e-mail and any files transmitted with it are for the sole use of the
> > intended recipient(s) and may contain confidential and privileged
> > information. Any unauthorized use of this email is strictly prohibited.
> > ©2015 Signal. All rights reserved.
> >
>



-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: Connecting with new consumers and existing group appears to cause existing group to rebalance

2016-01-07 Thread Cliff Rhyne
Hi Jason,

I'm just on the 0.9.0.0 release.  Are the fixes in the client, the kafka
service, or both?  I'll give it a try.

Is there a timeline for when 0.9.0.1 would be released?

Thanks,
Cliff

On Thu, Jan 7, 2016 at 3:14 PM, Jason Gustafson  wrote:

> Hey Cliff,
>
> Are you using the 0.9.0.0 release? We've fixed a few problems in the 0.9.0
> branch, some of which might explain the behavior you're seeing. There was
> one bug in particular which resulted in the consumer not fetching data for
> a set of partitions after a rebalance.
>
> -Jason
>
> On Thu, Jan 7, 2016 at 1:01 PM, Cliff Rhyne  wrote:
>
> > Hi,
> >
> > I'm testing out some changes with the 0.9.0.0 new KafkaConsumer API.  We
> > re-use the same consumer group ID across different components of the
> > application (which consume from different topics).  One topic is always
> > being consumed from, the rest are turned on and off.
> >
> > If I only run the always-on consumer, I have a very low occurrence rate
> of
> > the log message below:
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt
> > to heart beat failed since the group is rebalancing, try to re-join
> group.
> >
> >
> > If I run both types of consumers, the log message occurs frequently and
> the
> > alway-on consumer eventually doesn't succeed in rejoining (I see the
> > attempt in the logs to rejoin but nothing happens after that).  I only
> have
> > logs on the client side to work with; there's nothing showing up in the
> > kafka logs to show why the group's state isn't stable.
> >
> > Thanks,
> > Cliff
> >
> > --
> > Cliff Rhyne
> > Software Engineering Lead
> > e: crh...@signal.co
> > signal.co
> > 
> >
> > Cut Through the Noise
> >
> > This e-mail and any files transmitted with it are for the sole use of the
> > intended recipient(s) and may contain confidential and privileged
> > information. Any unauthorized use of this email is strictly prohibited.
> > ©2015 Signal. All rights reserved.
> >
>



-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Connecting with new consumers and existing group appears to cause existing group to rebalance

2016-01-07 Thread Cliff Rhyne
Hi,

I'm testing out some changes with the 0.9.0.0 new KafkaConsumer API.  We
re-use the same consumer group ID across different components of the
application (which consume from different topics).  One topic is always
being consumed from, the rest are turned on and off.

If I only run the always-on consumer, I have a very low occurrence rate of
the log message below:

org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt
to heart beat failed since the group is rebalancing, try to re-join group.


If I run both types of consumers, the log message occurs frequently and the
alway-on consumer eventually doesn't succeed in rejoining (I see the
attempt in the logs to rejoin but nothing happens after that).  I only have
logs on the client side to work with; there's nothing showing up in the
kafka logs to show why the group's state isn't stable.

Thanks,
Cliff

-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Cliff Rhyne
nce/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > >
> > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil 
> > > wrote:
> > > > >
> > > > > > Hi guys,
> > > > > >
> > > > > > I realized I never thanked yall for your input - thanks!
> > > > > > Jason: I apologize for assuming your stance on the issue! Feels
> > like
> > > we
> > > > > all
> > > > > > agreed on the solution. +1
> > > > > >
> > > > > > Follow-up: Jason made a point about defining prefetch and
> fairness
> > > > > > behaviour in the KIP. I am now working on putting that down in
> > > writing.
> > > > > To
> > > > > > do be able to do this I think I need to understand the current
> > > prefetch
> > > > > > behaviour in the new consumer API (0.9) a bit better. Some
> specific
> > > > > > questions:
> > > > > >
> > > > > >- How does a specific consumer balance incoming messages from
> > > > multiple
> > > > > >partitions? Is the consumer simply issuing Multi-Fetch
> > requests[1]
> > > > for
> > > > > > the
> > > > > >consumed assigned partitions of the relevant topics? Or is the
> > > > > consumer
> > > > > >fetching from one partition at a time and balancing between
> them
> > > > > >internally? That is, is the responsibility of partition
> > balancing
> > > > (and
> > > > > >fairness) on the broker side or consumer side?
> > > > > >- Is the above documented somewhere?
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> > > > > > ,
> > > > > > see "Multi-Fetch".
> > > > > >
> > > > > > Thanks,
> > > > > > Jens
> > > > > >
> > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma 
> > > > wrote:
> > > > > >
> > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira <
> g...@confluent.io
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Given the background, it sounds like you'll generally want
> each
> > > > call
> > > > > to
> > > > > > > > poll() to return the same number of events (which is the
> number
> > > you
> > > > > > > planned
> > > > > > > > on having enough memory / time for). It also sounds like
> tuning
> > > the
> > > > > > > number
> > > > > > > > of events will be closely tied to tuning the session timeout.
> > > That
> > > > > is -
> > > > > > > if
> > > > > > > > I choose to lower the session timeout for some reason, I will
> > > have
> > > > to
> > > > > > > > modify the number of records returning too.
> > > > > > > >
> > > > > > > > If those assumptions are correct, I think a configuration
> makes
> > > > more
> > > > > > > sense.
> > > > > > > > 1. We are unlikely to want this parameter to be change at the
> > > > > lifetime
> > > > > > of
> > > > > > > > the consumer
> > > > > > > > 2. The correct value is tied to another configuration
> > parameter,
> > > so
> > > > > > they
> > > > > > > > will be controlled together.
> > > > > > > >
> > > > > > >
> > > > > > > I was thinking the same thing.
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Jens Rantil
> > > > > > Backend engineer
> > > > > > Tink AB
> > > > > >
> > > > > > Email: jens.ran...@tink.se
> > > > > > Phone: +46 708 84 18 32
> > > > > > Web: www.tink.se
> > > > > >
> > > > > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > > > > > >
> > > > > >  Twitter <https://twitter.com/tink>
> > > > > >
> > > > >
> > > >
> > >
> >
>



-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Cliff Rhyne
o.
> > > > > >
> > > > > > If those assumptions are correct, I think a configuration makes
> > more
> > > > > sense.
> > > > > > 1. We are unlikely to want this parameter to be change at the
> > > lifetime
> > > > of
> > > > > > the consumer
> > > > > > 2. The correct value is tied to another configuration parameter,
> so
> > > > they
> > > > > > will be controlled together.
> > > > > >
> > > > >
> > > > > I was thinking the same thing.
> > > > >
> > > > > Ismael
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Jens Rantil
> > > > Backend engineer
> > > > Tink AB
> > > >
> > > > Email: jens.ran...@tink.se
> > > > Phone: +46 708 84 18 32
> > > > Web: www.tink.se
> > > >
> > > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin
> > > > <
> > > >
> > >
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > > > >
> > > >  Twitter <https://twitter.com/tink>
> > > >
> > >
> >
>



-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: New and updated producers and consumers

2015-11-06 Thread Cliff Rhyne
Adding more context, with the 0.8.2.1 consumers to find the number of
pending messages, we perform an OffsetFetchRequest to get the last
committed offset and an OffsetRequest to get the last offset in the log.  I
see that the new KafkaConsumer does a good job at replacing the
OffsetFetchRequest (with the committed() method), but it doesn't replace
the OffsetRequest.

Is this a value we could either create another request type for or add it
to the OffsetAndMetadata response for the committed() method?

Thanks,
Cliff

On Thu, Nov 5, 2015 at 1:12 PM, Cliff Rhyne  wrote:

> Hi Jeff,
>
> The java doc is very nice, thank you and thanks to whoever wrote it.
>
> I do have one question about the API.  For what we're doing, it's
> important for us to calculate the "lag" or pending message count.  Today we
> do that using the simple consumer to ask kafka for the committed offset
> (because we're using kafka to store offsets) and another similar call to
> kafka for the last offset in the log.
>
> It appears that I can get the last committed offset from the KafkaConsumer
> by calling committed(TopicPartition) which returns the OffsetAndMetadata
> for the partition.  How would I get the offset for the last record in the
> log?  Do I still need to use the simple consumer for this?
>
> Thanks,
> Cliff
>
> On Thu, Nov 5, 2015 at 11:58 AM, Jeff Holoman 
> wrote:
>
>> The best thing that I know is the latest javadoc that's committed to
>> trunk:
>>
>>
>> https://github.com/apache/kafka/blob/ef5d168cc8f10ad4f0efe9df4cbe849a4b35496e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
>>
>> Thanks
>>
>> Jeff
>>
>>
>>
>> On Thu, Nov 5, 2015 at 12:51 PM, Cliff Rhyne  wrote:
>>
>> > Hi Jeff,
>> >
>> > Is there a writeup of how to use the new consumer API (either in
>> general or
>> > for Java)?  I've seen various proposals but I don't see a recent one on
>> the
>> > actual implementation.  My team wants to start the development work to
>> > migrate to 0.9.
>> >
>> > Thanks,
>> > Cliff
>> >
>> > On Thu, Nov 5, 2015 at 11:18 AM, Jeff Holoman 
>> > wrote:
>> >
>> > > Prabhjot,
>> > >
>> > > The answer changes slightly for the Producer and Consumer and depends
>> on
>> > > your timeline and comfort with using new APIs
>> > >
>> > > Today and in the future, for the Producer, you should be using the
>> "new"
>> > > producer, which isn't all that new anymore:
>> > > org.apache.kafka.clients.producer.KafkaProducer;
>> > >
>> > >
>> > > Today with 0.9 yet to be released you'd likely want to use the
>> High-Level
>> > > Consumer. This is covered in the official docs here:
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>> > > and
>> > > in this blog post
>> > >
>> > >
>> >
>> http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
>> > > along
>> > > with most of the other examples that you'll find.
>> > >
>> > > After .9 is released, I'd encourage you to take a look at the new
>> > Consumer
>> > > API. This has a lot of advantages in terms of offset management and
>> will
>> > be
>> > > the only consumer client that fully supports security features like
>> SSL
>> > > that are slated to be released into the platform.
>> > >
>> > > Your choice of development language is entirely up to you. Note that
>> the
>> > > only version of clients that will be maintained in the project going
>> > > forward are being implemented in Java, so Scala or Java shouldn't
>> matter
>> > > too much for you.
>> > >
>> > > Hope this helps
>> > >
>> > > Jeff
>> > >
>> > >
>> > > On Thu, Nov 5, 2015 at 12:14 PM, Prabhjot Bharaj <
>> prabhbha...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hello Folks,
>> > > >
>> > > > Requesting your expertise on this.
>> > > > I see that under core/src/main/scala/kafka/producer/, there are many
>> > > > implementations - Producer.scala and SyncProducer.scala
>> > > >
>> > > > Also, going via the producerPerformance.scala, there are 2
>> > > imp

Re: New and updated producers and consumers

2015-11-05 Thread Cliff Rhyne
Hi Jeff,

The java doc is very nice, thank you and thanks to whoever wrote it.

I do have one question about the API.  For what we're doing, it's important
for us to calculate the "lag" or pending message count.  Today we do that
using the simple consumer to ask kafka for the committed offset (because
we're using kafka to store offsets) and another similar call to kafka for
the last offset in the log.

It appears that I can get the last committed offset from the KafkaConsumer
by calling committed(TopicPartition) which returns the OffsetAndMetadata
for the partition.  How would I get the offset for the last record in the
log?  Do I still need to use the simple consumer for this?

Thanks,
Cliff

On Thu, Nov 5, 2015 at 11:58 AM, Jeff Holoman  wrote:

> The best thing that I know is the latest javadoc that's committed to trunk:
>
>
> https://github.com/apache/kafka/blob/ef5d168cc8f10ad4f0efe9df4cbe849a4b35496e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
>
> Thanks
>
> Jeff
>
>
>
> On Thu, Nov 5, 2015 at 12:51 PM, Cliff Rhyne  wrote:
>
> > Hi Jeff,
> >
> > Is there a writeup of how to use the new consumer API (either in general
> or
> > for Java)?  I've seen various proposals but I don't see a recent one on
> the
> > actual implementation.  My team wants to start the development work to
> > migrate to 0.9.
> >
> > Thanks,
> > Cliff
> >
> > On Thu, Nov 5, 2015 at 11:18 AM, Jeff Holoman 
> > wrote:
> >
> > > Prabhjot,
> > >
> > > The answer changes slightly for the Producer and Consumer and depends
> on
> > > your timeline and comfort with using new APIs
> > >
> > > Today and in the future, for the Producer, you should be using the
> "new"
> > > producer, which isn't all that new anymore:
> > > org.apache.kafka.clients.producer.KafkaProducer;
> > >
> > >
> > > Today with 0.9 yet to be released you'd likely want to use the
> High-Level
> > > Consumer. This is covered in the official docs here:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> > > and
> > > in this blog post
> > >
> > >
> >
> http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
> > > along
> > > with most of the other examples that you'll find.
> > >
> > > After .9 is released, I'd encourage you to take a look at the new
> > Consumer
> > > API. This has a lot of advantages in terms of offset management and
> will
> > be
> > > the only consumer client that fully supports security features like SSL
> > > that are slated to be released into the platform.
> > >
> > > Your choice of development language is entirely up to you. Note that
> the
> > > only version of clients that will be maintained in the project going
> > > forward are being implemented in Java, so Scala or Java shouldn't
> matter
> > > too much for you.
> > >
> > > Hope this helps
> > >
> > > Jeff
> > >
> > >
> > > On Thu, Nov 5, 2015 at 12:14 PM, Prabhjot Bharaj <
> prabhbha...@gmail.com>
> > > wrote:
> > >
> > > > Hello Folks,
> > > >
> > > > Requesting your expertise on this.
> > > > I see that under core/src/main/scala/kafka/producer/, there are many
> > > > implementations - Producer.scala and SyncProducer.scala
> > > >
> > > > Also, going via the producerPerformance.scala, there are 2
> > > implementations
> > > > - NewShinyProducer (which points to KafkaProducer.java) and the
> > > OldProducer
> > > >
> > > > Similar might be the case with Consumers, but I have not seen that
> yet.
> > > >
> > > > Please let me know which producer and consumer is supposed to be used
> > and
> > > > which ones will be phased out in future releases, so I can focus on
> > only
> > > 1
> > > > type of producer and consumer (high level as well as simple)
> > > >
> > > > Thanks,
> > > > Prabhjot
> > > >
> > > > Thanks,
> > > > Prabhjot
> > > >
> > > > On Thu, Nov 5, 2015 at 3:55 PM, Prabhjot Bharaj <
> prabhbha...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Adding users as well
> > > > >
> > > > > On Thu, Nov 5, 2015 at 3:37 PM, Prabhjot Bharaj

Re: New and updated producers and consumers

2015-11-05 Thread Cliff Rhyne
Hi Jeff,

Is there a writeup of how to use the new consumer API (either in general or
for Java)?  I've seen various proposals but I don't see a recent one on the
actual implementation.  My team wants to start the development work to
migrate to 0.9.

Thanks,
Cliff

On Thu, Nov 5, 2015 at 11:18 AM, Jeff Holoman  wrote:

> Prabhjot,
>
> The answer changes slightly for the Producer and Consumer and depends on
> your timeline and comfort with using new APIs
>
> Today and in the future, for the Producer, you should be using the "new"
> producer, which isn't all that new anymore:
> org.apache.kafka.clients.producer.KafkaProducer;
>
>
> Today with 0.9 yet to be released you'd likely want to use the High-Level
> Consumer. This is covered in the official docs here:
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> and
> in this blog post
>
> http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
> along
> with most of the other examples that you'll find.
>
> After .9 is released, I'd encourage you to take a look at the new Consumer
> API. This has a lot of advantages in terms of offset management and will be
> the only consumer client that fully supports security features like SSL
> that are slated to be released into the platform.
>
> Your choice of development language is entirely up to you. Note that the
> only version of clients that will be maintained in the project going
> forward are being implemented in Java, so Scala or Java shouldn't matter
> too much for you.
>
> Hope this helps
>
> Jeff
>
>
> On Thu, Nov 5, 2015 at 12:14 PM, Prabhjot Bharaj 
> wrote:
>
> > Hello Folks,
> >
> > Requesting your expertise on this.
> > I see that under core/src/main/scala/kafka/producer/, there are many
> > implementations - Producer.scala and SyncProducer.scala
> >
> > Also, going via the producerPerformance.scala, there are 2
> implementations
> > - NewShinyProducer (which points to KafkaProducer.java) and the
> OldProducer
> >
> > Similar might be the case with Consumers, but I have not seen that yet.
> >
> > Please let me know which producer and consumer is supposed to be used and
> > which ones will be phased out in future releases, so I can focus on only
> 1
> > type of producer and consumer (high level as well as simple)
> >
> > Thanks,
> > Prabhjot
> >
> > Thanks,
> > Prabhjot
> >
> > On Thu, Nov 5, 2015 at 3:55 PM, Prabhjot Bharaj 
> > wrote:
> >
> > > Adding users as well
> > >
> > > On Thu, Nov 5, 2015 at 3:37 PM, Prabhjot Bharaj  >
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> I'm using the latest update: 0.8.2.2
> > >> I would like to use the latest producer and consumer apis
> > >> over the past few weeks, I have tried to do some performance
> > benchmarking
> > >> using the producer and consumer scripts provided in the bin directory.
> > It
> > >> was a fun activity and I have learnt a lot about kafka.
> > >>
> > >> But, I have also experienced that sometimes the implementation of the
> > >> performance scripts was not up-to-date or some items were different
> than
> > >> the documentation
> > >>
> > >> Now, I would like to develop my application with kafka. I'm
> comfortable
> > >> using scala/java
> > >>
> > >> Please let me know which producer and consumer (both high level and
> > >> simple) class/object should I be using
> > >>
> > >> Thanks a lot,
> > >> Prabhjot
> > >>
> > >
> > >
> > >
> > > --
> > > -
> > > "There are only 10 types of people in the world: Those who understand
> > > binary, and those who don't"
> > >
> >
> >
> >
> > --
> > -
> > "There are only 10 types of people in the world: Those who understand
> > binary, and those who don't"
> >
>
>
>
> --
> Jeff Holoman
> Systems Engineer
>



-- 
Cliff Rhyne
Software Engineering Lead
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: 0.9.0 release branch

2015-11-02 Thread Cliff Rhyne
Thanks, Jun.  We will switch clients shortly after the release.

Cliff

> On Nov 2, 2015, at 7:26 PM, Jun Rao  wrote:
> 
> Cliff,
> 
> We try not to patch the old consumer too much since we are adding the new
> java consumer in 0.9. The new consumer supports callbacks during rebalances
> and can address the problem in KAFKA-2725 better.
> 
> Thanks,
> 
> Jun
> 
>> On Mon, Nov 2, 2015 at 11:16 AM, Cliff Rhyne  wrote:
>> 
>> Hi Jun,
>> 
>> I openned KAFKA-2725 based on my experience with duplicate message
>> processing with auto-commit off.  I think it's a fairly small change,
>> especially for someone familiar with the kafka code-base but it makes a big
>> impact for clients not using auto-commit.  Can this be included in 0.9.0?
>> 
>> Thanks,
>> Cliff
>> 
>> On Mon, Nov 2, 2015 at 12:57 PM, Jason Gustafson 
>> wrote:
>> 
>>> I added KAFKA-2691 as well, which improves client handling of
>> authorization
>>> errors.
>>> 
>>> -Jason
>>> 
>>>> On Mon, Nov 2, 2015 at 10:25 AM, Becket Qin 
>>> wrote:
>>> 
>>>> Hi Jun,
>>>> 
>>>> I added KAFKA-2722 as a blocker for 0.9. It fixes the ISR propagation
>>>> scalability issue we saw.
>>>> 
>>>> Thanks,
>>>> 
>>>> Jiangjie (Becket) Qin
>>>> 
>>>>> On Mon, Nov 2, 2015 at 9:16 AM, Jun Rao  wrote:
>>>>> 
>>>>> Hi, everyone,
>>>>> 
>>>>> We are getting close to the 0.9.0 release. The current plan is to
>> have
>>>> the
>>>>> following remaining 0.9.0 blocker issues resolved this week, cut the
>>>> 0.9.0
>>>>> release branch by Nov. 6 (Friday) and start the RC on Nov. 9
>> (Monday).
>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun


Re: 0.9.0 release branch

2015-11-02 Thread Cliff Rhyne
Hi Jun,

I openned KAFKA-2725 based on my experience with duplicate message
processing with auto-commit off.  I think it's a fairly small change,
especially for someone familiar with the kafka code-base but it makes a big
impact for clients not using auto-commit.  Can this be included in 0.9.0?

Thanks,
Cliff

On Mon, Nov 2, 2015 at 12:57 PM, Jason Gustafson  wrote:

> I added KAFKA-2691 as well, which improves client handling of authorization
> errors.
>
> -Jason
>
> On Mon, Nov 2, 2015 at 10:25 AM, Becket Qin  wrote:
>
> > Hi Jun,
> >
> > I added KAFKA-2722 as a blocker for 0.9. It fixes the ISR propagation
> > scalability issue we saw.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 2, 2015 at 9:16 AM, Jun Rao  wrote:
> >
> > > Hi, everyone,
> > >
> > > We are getting close to the 0.9.0 release. The current plan is to have
> > the
> > > following remaining 0.9.0 blocker issues resolved this week, cut the
> > 0.9.0
> > > release branch by Nov. 6 (Friday) and start the RC on Nov. 9 (Monday).
> > >
> > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> >
>



-- 
Cliff Rhyne
Software Engineering Lead
m: 760-917-7823
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


[jira] [Created] (KAFKA-2725) high level consumer rebalances with auto-commit disabled should throw an exception

2015-11-02 Thread Cliff Rhyne (JIRA)
Cliff Rhyne created KAFKA-2725:
--

 Summary: high level consumer rebalances with auto-commit disabled 
should throw an exception
 Key: KAFKA-2725
 URL: https://issues.apache.org/jira/browse/KAFKA-2725
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.1
 Environment: Experienced on Java running in linux
Reporter: Cliff Rhyne


Auto-commit is a very resilient mode.  Drops in zookeeper sessions due to JVM 
garbage collection, network, rebalance or other interference are handled 
gracefully within the kafka client.

Systems still can drop due to unexpected gc or network behavior.  My proposal 
is to handle this drop better when auto-commit is turned off:

- If a rebalance or similar occur (which cause the offset to get reverted in 
the client), check and see if the client was assigned back to the same 
partition or a different one.  If it's the same partition, find the place last 
consumed (it doesn't do this today for us).  This is to make a graceful 
recovery.
- If the partition assignment changes (which can mean duplicate data is getting 
processed), throw an exception back to the application code.  This lets the 
application code handle this exception-case with respect to the work it's doing 
(with might be transactional).  Failing "silently" (yes it's still getting 
logged) is very dangerous in our situation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Java high level consumer providing duplicate messages when auto commit is off

2015-10-25 Thread Cliff Rhyne
I was reflecting on this more and I think there is a change or two that
should be considered for kafka.

First off, auto-commit is a very resilient mode.  Drops in zookeeper
sessions due to garbage collection, network, rebalance or other
interference are handled gracefully within the kafka client.

Systems still can drop even if the garbage collector is well tuned and
we're hosted on a high-quality network (the difference between 99.9% and
99.% reliability).  My proposal is to handle this drop better when
auto-commit is turned off:

- If a rebalance or similar occur (which cause the offset to get reverted
in the client), check and see if the client was assigned back to the same
partition or a different one.  If it's the same partition, find the place
last consumed (it doesn't do this today for us).  This is to make a
graceful recovery.
- If the partition assignment changes (which can mean duplicate data is
getting processed), throw an exception back to the application code.  This
lets the application code handle this exception-case with respect to the
work it's doing (with might be transactional).  Failing "silently" (yes
it's still getting logged) is very dangerous in our situation.

One or both of these might need to be behind a consumer config parameter,
I'm not familiar enough with when these get created and when the behavior
is simply changed for the better for the kafka project.

What are your thoughts?

Thanks,
Cliff

On Fri, Oct 23, 2015 at 10:57 AM, Cliff Rhyne  wrote:

> Thanks, Jiangjie.  Understanding more about the auto-commit behavior and
> why it's resilient to these is a big help.
>
> We're going to do some deeper investigation and testing.  I'll report back
> when I have more information.
>
> Thanks,
> Cliff
>
>
> On Thu, Oct 22, 2015 at 11:48 PM, Jiangjie Qin 
> wrote:
>
>> Hi Cliff,
>>
>> If auto.offset.commit is set to true, the offset will be committed in
>> following cases in addition to periodical offset commit:
>>
>> 1. During consumer rebalance before release the partition ownership.
>> If consumer A owns partition P before rebalance, it will commit offset for
>> partition P during rebalance. If consumer B become the new owner of
>> partition P after rebalance, it will start from the committed offset, so
>> there will be no duplicate messages.
>> 2. When consumer closes.
>>
>> Rebalance will be triggered in the following cases:
>> 1. A consumer joins/leaves the group.
>> 2. Some topic/partition changes occurred to the interested topics.(e.g.
>> partition expansion for a topic; a new topic created and the consumer is
>> using a wildcard that matches the new topic name)
>>
>> To answer your question:
>> Simple consumer should not interfere with high level consumer because it
>> does not have any group management embedded.
>>
>> Typically a single high level consumer group will not rebalance unless
>> there is topic/partition change. However, it is possible the consumer
>> itself dropped out of the group and rejoins. This typically happens when
>> you have a ZK session timeout. In that case, you should see "ZK expired"
>> in
>> your log. You can search for that and see if that is the problem.
>>
>> Jiangjie (Becket) Qin
>>
>>
>> On Thu, Oct 22, 2015 at 1:14 PM, Cliff Rhyne  wrote:
>>
>> > We did some more testing with logging turned on (I figured out why it
>> > wasn't working).  We tried increasing the JVM memory capacity on our
>> test
>> > server (it's lower than in production) and increasing the zookeeper
>> > timeouts.  Neither changed the results.  With trace logging enabled, we
>> saw
>> > that we were getting rebalances even though there is only one high level
>> > consumer running (there previously was a simple consumer that was told
>> to
>> > disconnect, but that consumer only checked the offsets and never
>> consumed
>> > data).
>> >
>> > - Is there possibly a race condition where the simple consumer has a
>> hold
>> > on a partition and shutdown is called before starting a high level
>> consumer
>> > but shutdown is done asynchronously?
>> > - What are the various things that can cause a consumer rebalance other
>> > than adding / removing high level consumers?
>> >
>> > Thanks,
>> > Cliff
>> >
>> > On Wed, Oct 21, 2015 at 4:20 PM, Cliff Rhyne  wrote:
>> >
>> > > Hi Kris,
>> > >
>> > > Thanks for the tip.  I'm going to investigate this further.  I checked
>> > and
>> > > we hav

Re: Java high level consumer providing duplicate messages when auto commit is off

2015-10-23 Thread Cliff Rhyne
Thanks, Jiangjie.  Understanding more about the auto-commit behavior and
why it's resilient to these is a big help.

We're going to do some deeper investigation and testing.  I'll report back
when I have more information.

Thanks,
Cliff

On Thu, Oct 22, 2015 at 11:48 PM, Jiangjie Qin 
wrote:

> Hi Cliff,
>
> If auto.offset.commit is set to true, the offset will be committed in
> following cases in addition to periodical offset commit:
>
> 1. During consumer rebalance before release the partition ownership.
> If consumer A owns partition P before rebalance, it will commit offset for
> partition P during rebalance. If consumer B become the new owner of
> partition P after rebalance, it will start from the committed offset, so
> there will be no duplicate messages.
> 2. When consumer closes.
>
> Rebalance will be triggered in the following cases:
> 1. A consumer joins/leaves the group.
> 2. Some topic/partition changes occurred to the interested topics.(e.g.
> partition expansion for a topic; a new topic created and the consumer is
> using a wildcard that matches the new topic name)
>
> To answer your question:
> Simple consumer should not interfere with high level consumer because it
> does not have any group management embedded.
>
> Typically a single high level consumer group will not rebalance unless
> there is topic/partition change. However, it is possible the consumer
> itself dropped out of the group and rejoins. This typically happens when
> you have a ZK session timeout. In that case, you should see "ZK expired" in
> your log. You can search for that and see if that is the problem.
>
> Jiangjie (Becket) Qin
>
>
> On Thu, Oct 22, 2015 at 1:14 PM, Cliff Rhyne  wrote:
>
> > We did some more testing with logging turned on (I figured out why it
> > wasn't working).  We tried increasing the JVM memory capacity on our test
> > server (it's lower than in production) and increasing the zookeeper
> > timeouts.  Neither changed the results.  With trace logging enabled, we
> saw
> > that we were getting rebalances even though there is only one high level
> > consumer running (there previously was a simple consumer that was told to
> > disconnect, but that consumer only checked the offsets and never consumed
> > data).
> >
> > - Is there possibly a race condition where the simple consumer has a hold
> > on a partition and shutdown is called before starting a high level
> consumer
> > but shutdown is done asynchronously?
> > - What are the various things that can cause a consumer rebalance other
> > than adding / removing high level consumers?
> >
> > Thanks,
> > Cliff
> >
> > On Wed, Oct 21, 2015 at 4:20 PM, Cliff Rhyne  wrote:
> >
> > > Hi Kris,
> > >
> > > Thanks for the tip.  I'm going to investigate this further.  I checked
> > and
> > > we have fairly short zk timeouts and run with a smaller memory
> allocation
> > > on the two environments we encounter this issue.  I'll let you all know
> > > what I find.
> > >
> > > I saw this ticket https://issues.apache.org/jira/browse/KAFKA-2049
> that
> > > seems to be related to the problem (but would only inform that an issue
> > > occurred).  Are there any other open issues that could be worked on to
> > > improve Kafka's handling of this situation?
> > >
> > > Thanks,
> > > Cliff
> > >
> > > On Wed, Oct 21, 2015 at 2:53 PM, Kris K 
> wrote:
> > >
> > >> Hi Cliff,
> > >>
> > >> One other case I observed in my environment is - when there were gc
> > pauses
> > >> on one of our high level consumer in the group.
> > >>
> > >> Thanks,
> > >> Kris
> > >>
> > >> On Wed, Oct 21, 2015 at 10:12 AM, Cliff Rhyne 
> wrote:
> > >>
> > >> > Hi James,
> > >> >
> > >> > There are two scenarios we run:
> > >> >
> > >> > 1. Multiple partitions with one consumer per partition.  This rarely
> > has
> > >> > starting/stopping of consumers, so the pool is very static.  There
> is
> > a
> > >> > configured consumer timeout, which is causing the
> > >> ConsumerTimeoutException
> > >> > to get thrown prior to the test starting.  We handle this exception
> > and
> > >> > then resume consuming.
> > >> > 2. Single partition with one consumer.  This consumer is started by
> a
> > >> > triggered condition (number of messages pendin

Re: Java high level consumer providing duplicate messages when auto commit is off

2015-10-22 Thread Cliff Rhyne
We did some more testing with logging turned on (I figured out why it
wasn't working).  We tried increasing the JVM memory capacity on our test
server (it's lower than in production) and increasing the zookeeper
timeouts.  Neither changed the results.  With trace logging enabled, we saw
that we were getting rebalances even though there is only one high level
consumer running (there previously was a simple consumer that was told to
disconnect, but that consumer only checked the offsets and never consumed
data).

- Is there possibly a race condition where the simple consumer has a hold
on a partition and shutdown is called before starting a high level consumer
but shutdown is done asynchronously?
- What are the various things that can cause a consumer rebalance other
than adding / removing high level consumers?

Thanks,
Cliff

On Wed, Oct 21, 2015 at 4:20 PM, Cliff Rhyne  wrote:

> Hi Kris,
>
> Thanks for the tip.  I'm going to investigate this further.  I checked and
> we have fairly short zk timeouts and run with a smaller memory allocation
> on the two environments we encounter this issue.  I'll let you all know
> what I find.
>
> I saw this ticket https://issues.apache.org/jira/browse/KAFKA-2049 that
> seems to be related to the problem (but would only inform that an issue
> occurred).  Are there any other open issues that could be worked on to
> improve Kafka's handling of this situation?
>
> Thanks,
> Cliff
>
> On Wed, Oct 21, 2015 at 2:53 PM, Kris K  wrote:
>
>> Hi Cliff,
>>
>> One other case I observed in my environment is - when there were gc pauses
>> on one of our high level consumer in the group.
>>
>> Thanks,
>> Kris
>>
>> On Wed, Oct 21, 2015 at 10:12 AM, Cliff Rhyne  wrote:
>>
>> > Hi James,
>> >
>> > There are two scenarios we run:
>> >
>> > 1. Multiple partitions with one consumer per partition.  This rarely has
>> > starting/stopping of consumers, so the pool is very static.  There is a
>> > configured consumer timeout, which is causing the
>> ConsumerTimeoutException
>> > to get thrown prior to the test starting.  We handle this exception and
>> > then resume consuming.
>> > 2. Single partition with one consumer.  This consumer is started by a
>> > triggered condition (number of messages pending to be processed in the
>> > kafka topic or a schedule).  The consumer is stopped after processing is
>> > completed.
>> >
>> > In both cases, based on my understanding there shouldn't be a rebalance
>> as
>> > either a) all consumers are running or b) there's only one consumer /
>> > partition.  Also, the same consumer group is used by all consumers in
>> > scenario 1 and 2.  Is there a good way to investigate whether rebalances
>> > are occurring?
>> >
>> > Thanks,
>> > Cliff
>> >
>> > On Wed, Oct 21, 2015 at 11:37 AM, James Cheng  wrote:
>> >
>> > > Do you have multiple consumers in a consumer group?
>> > >
>> > > I think that when a new consumer joins the consumer group, that the
>> > > existing consumers will stop consuming during the group rebalance, and
>> > then
>> > > when they start consuming again, that they will consume from the last
>> > > committed offset.
>> > >
>> > > You should get more verification on this, tho. I might be remembering
>> > > wrong.
>> > >
>> > > -James
>> > >
>> > > > On Oct 21, 2015, at 8:40 AM, Cliff Rhyne  wrote:
>> > > >
>> > > > Hi,
>> > > >
>> > > > My team and I are looking into a problem where the Java high level
>> > > consumer
>> > > > provides duplicate messages if we turn auto commit off (using
>> version
>> > > > 0.8.2.1 of the server and Java client).  The expected sequence of
>> > events
>> > > > are:
>> > > >
>> > > > 1. Start high-level consumer and initialize a KafkaStream to get a
>> > > > ConsumerIterator
>> > > > 2. Consume n items (could be 10,000, could be 1,000,000) from the
>> > > iterator
>> > > > 3. Commit the new offsets
>> > > >
>> > > > What we are seeing is that during step 2, some number of the n
>> messages
>> > > are
>> > > > getting returned by the iterator in duplicate (in some cases, we've
>> > seen
>> > > > n*5 messages consumed

Re: Java high level consumer providing duplicate messages when auto commit is off

2015-10-21 Thread Cliff Rhyne
Hi Kris,

Thanks for the tip.  I'm going to investigate this further.  I checked and
we have fairly short zk timeouts and run with a smaller memory allocation
on the two environments we encounter this issue.  I'll let you all know
what I find.

I saw this ticket https://issues.apache.org/jira/browse/KAFKA-2049 that
seems to be related to the problem (but would only inform that an issue
occurred).  Are there any other open issues that could be worked on to
improve Kafka's handling of this situation?

Thanks,
Cliff

On Wed, Oct 21, 2015 at 2:53 PM, Kris K  wrote:

> Hi Cliff,
>
> One other case I observed in my environment is - when there were gc pauses
> on one of our high level consumer in the group.
>
> Thanks,
> Kris
>
> On Wed, Oct 21, 2015 at 10:12 AM, Cliff Rhyne  wrote:
>
> > Hi James,
> >
> > There are two scenarios we run:
> >
> > 1. Multiple partitions with one consumer per partition.  This rarely has
> > starting/stopping of consumers, so the pool is very static.  There is a
> > configured consumer timeout, which is causing the
> ConsumerTimeoutException
> > to get thrown prior to the test starting.  We handle this exception and
> > then resume consuming.
> > 2. Single partition with one consumer.  This consumer is started by a
> > triggered condition (number of messages pending to be processed in the
> > kafka topic or a schedule).  The consumer is stopped after processing is
> > completed.
> >
> > In both cases, based on my understanding there shouldn't be a rebalance
> as
> > either a) all consumers are running or b) there's only one consumer /
> > partition.  Also, the same consumer group is used by all consumers in
> > scenario 1 and 2.  Is there a good way to investigate whether rebalances
> > are occurring?
> >
> > Thanks,
> > Cliff
> >
> > On Wed, Oct 21, 2015 at 11:37 AM, James Cheng  wrote:
> >
> > > Do you have multiple consumers in a consumer group?
> > >
> > > I think that when a new consumer joins the consumer group, that the
> > > existing consumers will stop consuming during the group rebalance, and
> > then
> > > when they start consuming again, that they will consume from the last
> > > committed offset.
> > >
> > > You should get more verification on this, tho. I might be remembering
> > > wrong.
> > >
> > > -James
> > >
> > > > On Oct 21, 2015, at 8:40 AM, Cliff Rhyne  wrote:
> > > >
> > > > Hi,
> > > >
> > > > My team and I are looking into a problem where the Java high level
> > > consumer
> > > > provides duplicate messages if we turn auto commit off (using version
> > > > 0.8.2.1 of the server and Java client).  The expected sequence of
> > events
> > > > are:
> > > >
> > > > 1. Start high-level consumer and initialize a KafkaStream to get a
> > > > ConsumerIterator
> > > > 2. Consume n items (could be 10,000, could be 1,000,000) from the
> > > iterator
> > > > 3. Commit the new offsets
> > > >
> > > > What we are seeing is that during step 2, some number of the n
> messages
> > > are
> > > > getting returned by the iterator in duplicate (in some cases, we've
> > seen
> > > > n*5 messages consumed).  The problem appears to go away if we turn on
> > > auto
> > > > commit (and committing offsets to kafka helped too), but auto commit
> > > causes
> > > > conflicts with our offset rollback logic.  The issue seems to happen
> > more
> > > > when we are in our test environment on a lower-cost cloud provider.
> > > >
> > > > Diving into the Java and Scala classes including the
> ConsumerIterator,
> > > it's
> > > > not obvious what event causes a duplicate offset to be requested or
> > > > returned (there's even a loop that is supposed to exclude duplicate
> > > > messages in this class).  I tried turning on trace logging but my
> log4j
> > > > config isn't getting the Kafka client logs to write out.
> > > >
> > > > Does anyone have suggestions of where to look or how to enable
> logging?
> > > >
> > > > Thanks,
> > > > Cliff
> > >
> > >
> > > 
> > >
> > > This email and any attachments may contain confidential and privileged
> > > material for the sole use of the intended recipient. Any review,
> copying,
> > > or distribution of this email (or any attachments) by others is
> > prohibited.
> > > If you are not the intended recipient, please contact the sender
> > > immediately and permanently delete this email and any attachments. No
> > > employee or agent of TiVo Inc. is authorized to conclude any binding
> > > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> > > Inc. may only be made by a signed written agreement.
> > >
> >
>


Re: Java high level consumer providing duplicate messages when auto commit is off

2015-10-21 Thread Cliff Rhyne
Hi James,

There are two scenarios we run:

1. Multiple partitions with one consumer per partition.  This rarely has
starting/stopping of consumers, so the pool is very static.  There is a
configured consumer timeout, which is causing the ConsumerTimeoutException
to get thrown prior to the test starting.  We handle this exception and
then resume consuming.
2. Single partition with one consumer.  This consumer is started by a
triggered condition (number of messages pending to be processed in the
kafka topic or a schedule).  The consumer is stopped after processing is
completed.

In both cases, based on my understanding there shouldn't be a rebalance as
either a) all consumers are running or b) there's only one consumer /
partition.  Also, the same consumer group is used by all consumers in
scenario 1 and 2.  Is there a good way to investigate whether rebalances
are occurring?

Thanks,
Cliff

On Wed, Oct 21, 2015 at 11:37 AM, James Cheng  wrote:

> Do you have multiple consumers in a consumer group?
>
> I think that when a new consumer joins the consumer group, that the
> existing consumers will stop consuming during the group rebalance, and then
> when they start consuming again, that they will consume from the last
> committed offset.
>
> You should get more verification on this, tho. I might be remembering
> wrong.
>
> -James
>
> > On Oct 21, 2015, at 8:40 AM, Cliff Rhyne  wrote:
> >
> > Hi,
> >
> > My team and I are looking into a problem where the Java high level
> consumer
> > provides duplicate messages if we turn auto commit off (using version
> > 0.8.2.1 of the server and Java client).  The expected sequence of events
> > are:
> >
> > 1. Start high-level consumer and initialize a KafkaStream to get a
> > ConsumerIterator
> > 2. Consume n items (could be 10,000, could be 1,000,000) from the
> iterator
> > 3. Commit the new offsets
> >
> > What we are seeing is that during step 2, some number of the n messages
> are
> > getting returned by the iterator in duplicate (in some cases, we've seen
> > n*5 messages consumed).  The problem appears to go away if we turn on
> auto
> > commit (and committing offsets to kafka helped too), but auto commit
> causes
> > conflicts with our offset rollback logic.  The issue seems to happen more
> > when we are in our test environment on a lower-cost cloud provider.
> >
> > Diving into the Java and Scala classes including the ConsumerIterator,
> it's
> > not obvious what event causes a duplicate offset to be requested or
> > returned (there's even a loop that is supposed to exclude duplicate
> > messages in this class).  I tried turning on trace logging but my log4j
> > config isn't getting the Kafka client logs to write out.
> >
> > Does anyone have suggestions of where to look or how to enable logging?
> >
> > Thanks,
> > Cliff
>
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Java high level consumer providing duplicate messages when auto commit is off

2015-10-21 Thread Cliff Rhyne
Hi,

My team and I are looking into a problem where the Java high level consumer
provides duplicate messages if we turn auto commit off (using version
0.8.2.1 of the server and Java client).  The expected sequence of events
are:

1. Start high-level consumer and initialize a KafkaStream to get a
ConsumerIterator
2. Consume n items (could be 10,000, could be 1,000,000) from the iterator
3. Commit the new offsets

What we are seeing is that during step 2, some number of the n messages are
getting returned by the iterator in duplicate (in some cases, we've seen
n*5 messages consumed).  The problem appears to go away if we turn on auto
commit (and committing offsets to kafka helped too), but auto commit causes
conflicts with our offset rollback logic.  The issue seems to happen more
when we are in our test environment on a lower-cost cloud provider.

Diving into the Java and Scala classes including the ConsumerIterator, it's
not obvious what event causes a duplicate offset to be requested or
returned (there's even a loop that is supposed to exclude duplicate
messages in this class).  I tried turning on trace logging but my log4j
config isn't getting the Kafka client logs to write out.

Does anyone have suggestions of where to look or how to enable logging?

Thanks,
Cliff