stable release?

2016-01-04 Thread Jason Rosenberg
All,

I see that 0.8.2.2 is still listed as the 'stable release', while 0.9.0.0
is the 'latest release', for kafka.

At what point to we expect 0.9.X to become 'stable'?  Will it be 0.9.0.1?

Also, I assume more than a few of us have upgraded to 0.9.0.0 for
production environments, any reports so far of any issues?

Thanks,

Jason


Re: are 0.8.2.1 and 0.9.0.0 compatible?

2015-10-01 Thread Jason Rosenberg
Of course, that documentation needs to be updated to refer to '0.9.X'!

Also, I'm wondering if the last step there should be changed to remove the
property altogether and restart (rather than setting it to the new
version), since once the code is updated, it will use that by default?

On Thu, Oct 1, 2015 at 1:48 PM, Grant Henke  wrote:

> Hi Richard,
>
> You are correct that version will now be 0.9.0 and anything referencing
> 0.8.3 is being changed. You are also correct in the there have been wire
> protocol changes that break compatibility. However, backwards compatibility
> exists and you should always upgrade your brokers before upgrading your
> clients in order to avoid issues (In the future KIP-35
>  may change that).
>
> It's also worth noting that if you are performing a rolling upgrade of your
> brokers, you need to be sure brokers running the new protocol know to
> communicate with the old version to remain compatible during the bounce.
> This is done using the inter.broker.protocol.version property. More on that
> topic can be read here:
> https://kafka.apache.org/083/documentation.html#upgrade
>
> Hopefully that helps clear things up.
>
> Thank you,
> Grant
>
>
>
>
>
> On Thu, Oct 1, 2015 at 12:21 PM, Richard Lee  wrote:
>
> > Note the 0.8.3-SNAPSHOT has recently been renamed 0.9.0.0-SNAPSHOT.
> >
> > In any event, the major version number change could indicate that there
> > has, in fact, been some sort of incompatible change.  Using 0.9.0.0, I'm
> > also unable to use the kafka-console-consumer.sh to read from a 0.8.2.1
> > broker, but it works fine with a 0.9.0.0 broker.
> >
> > Some validation from a kafka expert that broker forward compatibility (or
> > client backward compatibility) is not supported would be appreciated, and
> > that this isn't just a case of some sort of local, fixable
> misconfiguration.
> >
> > Thanks!
> > Richard
> >
> > On 09/30/2015 11:17 AM, Doug Tomm wrote:
> >
> >> hello,
> >>
> >> i've got a set of broker nodes running 0.8.2.1.  on my laptop i'm also
> >> running 0.8.2.1, and i have a single broker node and mirrormaker there.
> >> i'm also using kafka-console-consumer.sh on the mac to display messages
> on
> >> a favorite topic being published from the broker nodes.  there are no
> >> messages on the topic, but everything is well-behaved.  i can inject
> >> messages with kafkacat and everything is fine.
> >>
> >> but then!
> >>
> >> on the laptop i switched everything to 0.8.3 but left the broker nodes
> >> alone.  now when i run mirrormaker i see this:
> >>
> >> [2015-09-30 10:44:55,090] WARN
> >>
> [ConsumerFetcherThread-tivo_kafka_110339-mbpr.local-1443635093396-c55cbafb-0-5],
> >> Error in fetch
> kafka.consumer.ConsumerFetcherThread$FetchRequest@61cb11c5.
> >> Possible cause: java.nio.BufferUnderflowException
> >> (kafka.consumer.ConsumerFetcherThread)
> >> [2015-09-30 10:44:55,624] WARN
> >>
> [ConsumerFetcherThread-tivo_kafka_110339-mbpr.local-1443635093396-c55cbafb-0-5],
> >> Error in fetch
> kafka.consumer.ConsumerFetcherThread$FetchRequest@3c7bb986.
> >> Possible cause: java.nio.BufferUnderflowException
> >> (kafka.consumer.ConsumerFetcherThread)
> >> [2015-09-30 10:44:56,181] WARN
> >>
> [ConsumerFetcherThread-tivo_kafka_110339-mbpr.local-1443635093396-c55cbafb-0-5],
> >> Error in fetch
> kafka.consumer.ConsumerFetcherThread$FetchRequest@1d4fbd2c.
> >> Possible cause: java.nio.BufferUnderflowException
> >> (kafka.consumer.ConsumerFetcherThread)
> >> [2015-09-30 10:44:56,726] WARN
> >>
> [ConsumerFetcherThread-tivo_kafka_110339-mbpr.local-1443635093396-c55cbafb-0-5],
> >> Error in fetch
> kafka.consumer.ConsumerFetcherThread$FetchRequest@59e67b2f.
> >> Possible cause: java.nio.BufferUnderflowException
> >> (kafka.consumer.ConsumerFetcherThread)
> >>
> >> if i use kafkacat to generate a message on the topic i see
> >> IllegalArgumentExceptions instead.
> >>
> >> this suggests that the two versions of kafka aren't compatible. is this
> >> the case?  does the whole ecosystem need to be on the same version?
> >>
> >> thank you,
> >> doug
> >>
> >>
> >
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>


Re: Log Cleaner Thread Stops

2015-09-23 Thread Jason Rosenberg
It looks like that fix will not be included in a release until 0.9.0.0.

I'm thinking maybe it makes sense not to switch to kafka storage for
offsets until then?

Jason

On Fri, Sep 18, 2015 at 1:25 PM, Todd Palino  wrote:

> I think the last major issue with log compaction (that it couldn't handle
> compressed messages) was committed as part of
> https://issues.apache.org/jira/browse/KAFKA-1374 in August, but I'm not
> certain what version this will end up in. It may be part of 0.8.2.2.
>
> Regardless, you'll probably be OK now. We've found that once we clean this
> issue up once it doesn't appear to recur. As long as you're not writing in
> compressed messages to a log compacted topic (and that won't happen with
> __consumer_offsets, as it's managed by the brokers themselves - it would
> only be if you were using other log compacted topics), you're likely in the
> clear now.
>
> -Todd
>
>
> On Fri, Sep 18, 2015 at 9:54 AM, John Holland <
> john.holl...@objectpartners.com> wrote:
>
> > Thanks!
> >
> > I did what you suggested and it worked except it was necessary for me to
> > remove the cleaner-offset-checkpoint file from the data directory and
> > restart the servers.  The log indicates all is well.
> >
> > Do you know what version the fix to this will be in? I'm not looking
> > forward to dealing with this on a reoccurring basis.
> >
> > -John
> >
> > On Fri, Sep 18, 2015 at 8:48 AM Todd Palino  wrote:
> >
> > > Yes, this is a known concern, and it should be fixed with recent
> commits.
> > > In the meantime, you'll have to do a little manual cleanup.
> > >
> > > The problem you're running into is a corrupt message in the offsets
> > topic.
> > > We've seen this a lot. What you need to do is set the topic
> configuration
> > > to remove the cleanup.policy config, and set retention.ms and
> segment.ms
> > > to
> > > something reasonably low. I suggest using a value of 3 or 4 times your
> > > commit interval for consumers. Then wait until the log segments are
> > reaped
> > > (wait twice as long as the retention.ms you chose, to be safe). Once
> > this
> > > is done, you can set the topic configuration back the way it was
> (remove
> > > segment.ms and retention.ms configs, and set cleanup.policy=compact).
> > > Lastly, you'll need to do a rolling bounce of the cluster to restart
> the
> > > brokers (which restarts the log cleaner threads). Technically, you only
> > > need to restart brokers where the threads have died, but it's easier to
> > > just restart all of them.
> > >
> > > Keep in mind that when you do this, you are deleting old offsets. If
> your
> > > consumers are all live and healthy, this shouldn't be a problem because
> > > they will just continue to commit their offsets properly. But if you
> have
> > > an offline consumer, you'll lose the committed offsets by doing this.
> > >
> > > -Todd
> > >
> > >
> > > On Fri, Sep 18, 2015 at 5:31 AM, John Holland <
> > > john.holl...@objectpartners.com> wrote:
> > >
> > > > I've been experiencing this issue across several of our environments
> > ever
> > > > since we enabled the log cleaner for the __consumer_offsets topic.
> > > >
> > > > We are on version 0.8.2.1 of kafka, using the new producer.  All of
> our
> > > > consumers are set to commit to kafka only.
> > > >
> > > > Below is the stack trace in the log I've encountered across several
> > > > different clusters.  A simple restart of kafka will allow compaction
> to
> > > > continue on all of the other partitions but the incorrect one will
> > always
> > > > fail.
> > > >
> > > > Here are the values for it from the kafka-topics --describe command:
> > > >
> > > > Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
> > > > Configs:segment.bytes=104857600,cleanup.policy=compact
> > > >
> > > > Are there any recommendations on how to prevent this and the best way
> > to
> > > > recover from this exception?  This is causing disk space to fill up
> > > quickly
> > > > on the node.
> > > >
> > > > I did see an open issue that seems very similar to this
> > > > https://issues.apache.org/jira/browse/KAFKA-1641 but this is the
> > > > __consumer_offsets topic which I have not had any part in setting up
> > nor
> > > > producing to.
> > > >
> > > > [2015-09-18 02:57:25,520] INFO Cleaner 0: Beginning cleaning of log
> > > > __consumer_offsets-17. (kafka.log.LogCleaner)
> > > > [2015-09-18 02:57:25,520] INFO Cleaner 0: Building offset map for
> > > > __consumer_offsets-17... (kafka.log.LogCleaner)
> > > > [2015-09-18 02:57:25,609] INFO Cleaner 0: Building offset map for log
> > > > __consumer_offsets-17 for 46 segments in offset range [468079184,
> > > > 528707475). (kafka.log.LogCleaner)
> > > > [2015-09-18 02:57:25,645] ERROR [kafka-log-cleaner-thread-0], Error
> due
> > > to
> > > >  (kafka.log.LogCleaner)
> > > > java.lang.IllegalArgumentException: requirement failed: Last clean
> > offset
> > > > is 468079184 but segment base offset is 0 for log
> 

Re: 0.9.0.0 remaining jiras

2015-09-15 Thread Jason Rosenberg
I'd be interested to see:

https://issues.apache.org/jira/browse/KAFKA-2434  (has patch available, we
will be using 'old' consumer for some time)
https://issues.apache.org/jira/browse/KAFKA-2125  (seems rather serious,
unclear if no longer relevant with new code?)
https://issues.apache.org/jira/browse/KAFKA-  (seems important to fix)

Jason

On Tue, Sep 15, 2015 at 3:28 PM, Jason Rosenberg <j...@squareup.com> wrote:

> Yep,
>
> It looks like this was only communicated originally to the dev list (and
> not the users list), so it wasn't obvious to all!
>
> Thanks,
>
> Jason
>
> On Mon, Sep 14, 2015 at 12:43 AM, Stevo Slavić <ssla...@gmail.com> wrote:
>
>> Hello Jason,
>>
>> Maybe this answers your question:
>>
>> http://mail-archives.apache.org/mod_mbox/kafka-dev/201509.mbox/%3CCAFc58G-UScVKrSF1kdsowQ8Y96OAaZEdiZsk40G8fwf7iToFaw%40mail.gmail.com%3E
>>
>> Kind regards,
>> Stevo Slavic.
>>
>>
>> On Mon, Sep 14, 2015 at 8:56 AM, Jason Rosenberg <j...@squareup.com>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > Can you clarify, will there not be a 0.8.3.0 (and instead we move
>> straight
>> > to 0.9.0.0)?
>> >
>> > Also, can you outline the man new features/updates for 0.9.0.0?
>> >
>> > Thanks,
>> >
>> > Jason
>> >
>> > On Sat, Sep 12, 2015 at 12:40 PM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > The following is a candidate list of jiras that we want to complete in
>> > the
>> > > upcoming release (0.9.0.0). Our goal is to finish at least all the
>> > blockers
>> > > and as many as the non-blockers possible in that list.
>> > >
>> > >
>> > >
>> >
>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
>> > >
>> > > Anything should be added/removed from this list?
>> > >
>> > > We are shooting to cut an 0.9.0.0 release branch in early October.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> >
>>
>
>


Re: 0.9.0.0 remaining jiras

2015-09-15 Thread Jason Rosenberg
Yep,

It looks like this was only communicated originally to the dev list (and
not the users list), so it wasn't obvious to all!

Thanks,

Jason

On Mon, Sep 14, 2015 at 12:43 AM, Stevo Slavić <ssla...@gmail.com> wrote:

> Hello Jason,
>
> Maybe this answers your question:
>
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201509.mbox/%3CCAFc58G-UScVKrSF1kdsowQ8Y96OAaZEdiZsk40G8fwf7iToFaw%40mail.gmail.com%3E
>
> Kind regards,
> Stevo Slavic.
>
>
> On Mon, Sep 14, 2015 at 8:56 AM, Jason Rosenberg <j...@squareup.com> wrote:
>
> > Hi Jun,
> >
> > Can you clarify, will there not be a 0.8.3.0 (and instead we move
> straight
> > to 0.9.0.0)?
> >
> > Also, can you outline the man new features/updates for 0.9.0.0?
> >
> > Thanks,
> >
> > Jason
> >
> > On Sat, Sep 12, 2015 at 12:40 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > The following is a candidate list of jiras that we want to complete in
> > the
> > > upcoming release (0.9.0.0). Our goal is to finish at least all the
> > blockers
> > > and as many as the non-blockers possible in that list.
> > >
> > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
> > >
> > > Anything should be added/removed from this list?
> > >
> > > We are shooting to cut an 0.9.0.0 release branch in early October.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> >
>


Re: 0.9.0.0 remaining jiras

2015-09-14 Thread Jason Rosenberg
Hi Jun,

Can you clarify, will there not be a 0.8.3.0 (and instead we move straight
to 0.9.0.0)?

Also, can you outline the man new features/updates for 0.9.0.0?

Thanks,

Jason

On Sat, Sep 12, 2015 at 12:40 PM, Jun Rao  wrote:

> The following is a candidate list of jiras that we want to complete in the
> upcoming release (0.9.0.0). Our goal is to finish at least all the blockers
> and as many as the non-blockers possible in that list.
>
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
>
> Anything should be added/removed from this list?
>
> We are shooting to cut an 0.9.0.0 release branch in early October.
>
> Thanks,
>
> Jun
>


Re: [ANNOUNCE] Burrow - Consumer Lag Monitoring as a Service

2015-06-09 Thread Jason Rosenberg
Hi Todd,

Thanks for open sourcing this, I'm excited to take a look.

It looks like it's specific to offsets stored in kafka (and not zookeeper)
correct?  I assume by that that LinkedIn is using the kafka storage now in
production?

Jason

On Thu, Jun 4, 2015 at 9:43 PM, Todd Palino tpal...@gmail.com wrote:

 I am very happy to introduce Burrow, an application to provide Kafka
 consumer status as a service. Burrow is different than just a lag
 checker:

 * Multiple Kafka cluster support - Burrow supports any number of Kafka
 clusters in a single instance. You can also run multiple copies of Burrow
 in parallel and only one of them will send out notifications.

 * All consumers, all partitions - If the consumer is committing offsets to
 Kafka (not Zookeeper), it will be available in Burrow automatically. Every
 partition it consumes will be monitored simultaneously, avoiding the trap
 of just watching the worst partition (MaxLag) or spot checking individual
 topics.

 * Status can be checked via HTTP request - There's an internal HTTP server
 that provides topic and consumer lists, can give you the latest offsets for
 a topic either from the brokers or from the consumer, and lets you check
 consumer status.

 * Continuously monitor groups with output via email or a call to an
 external HTTP endpoint - Configure emails to send for bad groups, checked
 continuously. Or you can have Burrow call an HTTP endpoint into another
 system for handling alerts.

 * No thresholds - Status is determined over a sliding window and does not
 rely on a fixed limit. When a consumer is checked, it has a status
 indicator that tells whether it is OK, a warning, or an error, and the
 partitions that caused it to be bad are provided.


 Burrow was created to address specific problems that LinkedIn has with
 monitoring consumers, in particular wildcard consumers like mirror makers
 and our audit consumers. Instead of checking offsets for specific consumers
 periodically, it monitors the stream of all committed offsets
 (__consumer_offsets) and continually calculates lag over a sliding window.

 We welcome all feedback, comments, and contributors. This project is very
 much under active development for us (we're using it in some of our
 environments now, and working on getting it running everywhere to replace
 our previous monitoring system).

 Burrow is written in Go, published under the Apache License, and hosted on
 GitHub at:
 https://github.com/linkedin/Burrow

 Documentation is on the GitHub wiki at:
 https://github.com/linkedin/Burrow/wiki

 -Todd



Re: How to prevent custom Partitioner from increasing the number of producer's requests?

2015-06-04 Thread Jason Rosenberg
Sebastien, I think you may have an off by 1 error (e.g. batch should be
0-199, not 1-200).  Thus you are sending 2 batches each time (one for 0,
another for 1-199).

Jason

On Thu, Jun 4, 2015 at 1:33 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 From the code you pasted, that is old producer.
 The new producer class is org.apache.kafka.clients.producer.KafkaProducer.

 The new producer does not have sticky partition behavior. The default
 partitioner use round-robin like way to send non-keyed messages to
 partitions.

 Jiangjie (Becket) Qin

 On 6/3/15, 11:35 PM, Sebastien Falquier sebastien.falqu...@teads.tv
 wrote:

 I am using this code (from org.apache.kafka % kafka_2.10 % 0.8.2.0),
 no idea if it is the old producer or the new one
 
 import kafka.producer.Produced
 import kafka.producer.ProducerConfig
 val prodConfig : ProducerConfig = new ProducerConfig(properties)
 val producer : Producer[Integer,String] = new
 Producer[Integer,String](prodConfig)
 
 How can I know which producer I am using? And what is the behavior of the
 new producer?
 
 Thanks,
 Sébastien
 
 
 2015-06-03 20:04 GMT+02:00 Jiangjie Qin j...@linkedin.com.invalid:
 
 
  Are you using new producer or old producer?
  The old producer has 10 min sticky partition behavior while the new
  producer does not.
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On 6/2/15, 11:58 PM, Sebastien Falquier sebastien.falqu...@teads.tv
  wrote:
 
  Hi Jason,
  
  The default partitioner does not make the job since my producers
 haven't a
  smooth traffic. What I mean is that they can deliver lots of messages
  during 10 minutes and less during the next 10 minutes, that is too say
 the
  first partition will have stacked most of the messages of the last 20
  minutes.
  
  By the way, I don't understand your point about breaking batch into 2
  separate partitions. With that code, I jump to a new partition on
 message
  201, 401, 601, ... with batch size = 200, where is my mistake?
  
  Thanks for your help,
  Sébastien
  
  2015-06-02 16:55 GMT+02:00 Jason Rosenberg j...@squareup.com:
  
   Hi Sebastien,
  
   You might just try using the default partitioner (which is random).
 It
   works by choosing a random partition each time it re-polls the
 meta-data
   for the topic.  By default, this happens every 10 minutes for each
 topic
   you produce to (so it evenly distributes load at a granularity of 10
   minutes).  This is based on 'topic.metadata.refresh.interval.ms'.
  
   I suspect your code is causing double requests for each batch, if
 your
   partitioning is actually breaking up your batches into 2 separate
   partitions.  Could be an off by 1 error, with your modulo
 calculation?
   Perhaps you need to use '% 0' instead of '% 1' there?
  
   Jason
  
  
  
   On Tue, Jun 2, 2015 at 3:35 AM, Sebastien Falquier 
   sebastien.falqu...@teads.tv wrote:
  
Hi guys,
   
I am new to Kafka and I am facing a problem I am not able to sort
 out.
   
To smooth traffic over all my brokers' partitions, I have coded a
  custom
Paritioner for my producers, using a simple round robin algorithm
 that
jumps from a partition to another on every batch of messages
   (corresponding
to batch.num.messages value). It looks like that :
https://gist.github.com/sfalquier/4c0c7f36dd96d642b416
   
With that fix, every partitions are used equally, but the amount of
requests from the producers to the brokers have been multiplied by
 2.
  I
   do
not understand since all producers are async with
  batch.num.messages=200
and the amount of messages processed is still the same as before.
 Why
  do
producers need more requests to do the job? As internal traffic is
 a
  bit
critical on our platform, I would really like to reduce producers'
   requests
volume if possible.
   
Any idea? Any suggestion?
   
Regards,
Sébastien
   
  
 
 




Re: Consumer lag lies - orphaned offsets?

2015-06-04 Thread Jason Rosenberg
I assume you are looking at a 'MaxLag' metric, which reports the worst case
lag over a set of partitions.  Are you consuming multiple partitions, and
maybe one of them is stuck?

On Tue, Jun 2, 2015 at 4:00 PM, Otis Gospodnetic otis.gospodne...@gmail.com
 wrote:

 Hi,

 I've noticed that when we restart our Kafka consumers our consumer lag
 metric sometimes looks weird.

 Here's an example: https://apps.sematext.com/spm-reports/s/0Hq5zNb4hH

 You can see lag go up around 15:00, when some consumers were restarted.
 The weird thing is that the lag remains flat!
 How could it remain flat if consumers are running? (they have enough juice
 to catch up!)

 What I think is happening is this:
 1) consumers are initially not really lagging
 2) consumers get stopped
 3) lag grows
 4) consumers get started again
 5) something shifts around...not sure what...
 6) consumers start consuming, and there is actually no lag, but the offsets
 written to ZK sometime during 3) don't get updated because after restart
 consumers are reading from somewhere else, not from partition(s) whose lag
 and offset delta jumped during 3)

 Oh, and:
 7) Kafka JMX still exposes all offsets, event those for partitions that are
 no longer being read, so the consumer lag metric remains constant/flat,
 even though consumers are actually not lagging on partitions from which
 they are now consuming.

 What bugs me is 7), because reading lag info from JMX looks like it's
 lying.

 Does this sound crazy or reasonable?

 If anyone has any comments/advice/suggestions for what one can do about
 this, I'm all ears!

 Thanks,
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/



Re: Issue with kafka-topics.sh tool for adding new partitions with replica assignment

2015-06-03 Thread Jason Rosenberg
Probably makes sense to file a Jira for this issue.

On Mon, May 11, 2015 at 8:28 AM, Stefan Schadwinkel 
stefan.schadwin...@smaato.com wrote:

 Hi,

 with Kafka 0.8 it is possible to add new partitions on newly added brokers
 and supply a partition assignment to put the new partitions mainly on the
 new brokers (4 and 5 are the new brokers):

 bin/kafka-add-partitions.sh --topic scale-test-001 \
 --partition 14 \
 --replica-assignment-list
 4:5,4:1,4:2,4:3,4:5,4:1,4:2,5:3,5:4,5:1,5:2,5:3,5:4,5:1 \
 --zookeeper 127.0.0.1:2181

 For 0.8.1+ the kafka-add-partitions.sh tool was merged into
 kafka-topics.sh, but when you try to execute something similar you receive
 the following error (in Kafka 0.8.2.1):

 kafka_2.10-0.8.2.1$ bin/kafka-topics.sh --alter --topic scale-test-002 \
  --zookeeper 127.0.0.1:2181 \
  --partitions 35 \
  --replica-assignment

  
 2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,3:2,2:3,4:5,4:2,4:2,4:3,4:5,4:3,4:2,5:3,5:4,5:4,5:2,5:3,5:4,5:3
 Option [replica-assignment] can't be used with option[partitions]


 However, upon investigation of alterTopics in TopicCommand.scala the code
 it wants to execute is:

 val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
 val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
 AdminUtils.addPartitions(zkClient, topic, nPartitions,
 replicaAssignmentStr, config = configs)

 So assigning both the --partitions and the --replica-assignment parameters
 should be totally fine.

 The issue is with the following line in checkArgs:

 CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt,
   allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt +
 replicationFactorOpt)

 If it is removed, then the above command executes just fine. The created
 partitions are as well filled quite happily.

 I'm not fully sure what the correct configuration of the
 replicaAssignmentOpt should be, so I don't provide a patch, but it would be
 great if that could be fixed.

 Best regards,
 Stefan


 --

 *Dr. Stefan Schadwinkel*
 Senior Big Data Developer
 stefan.schadwin...@smaato.com




 Smaato Inc.
 San Francisco – New York - Hamburg - Singapore
 www.smaato.com





 Germany:
 Valentinskamp 70, Emporio, 19th Floor

 20355 Hamburg


 T  +49 (40) 3480 949 0
 F  +49 (40) 492 19 055



 The information contained in this communication may be CONFIDENTIAL and is
 intended only for the use of the recipient(s) named above. If you are not
 the intended recipient, you are hereby notified that any dissemination,
 distribution, or copying of this communication, or any of its contents, is
 strictly prohibited. If you have received this communication in error,
 please notify the sender and delete/destroy the original message and any
 copy of it from your computer or paper files.



Re: How to prevent custom Partitioner from increasing the number of producer's requests?

2015-06-02 Thread Jason Rosenberg
Hi Sebastien,

You might just try using the default partitioner (which is random).  It
works by choosing a random partition each time it re-polls the meta-data
for the topic.  By default, this happens every 10 minutes for each topic
you produce to (so it evenly distributes load at a granularity of 10
minutes).  This is based on 'topic.metadata.refresh.interval.ms'.

I suspect your code is causing double requests for each batch, if your
partitioning is actually breaking up your batches into 2 separate
partitions.  Could be an off by 1 error, with your modulo calculation?
Perhaps you need to use '% 0' instead of '% 1' there?

Jason



On Tue, Jun 2, 2015 at 3:35 AM, Sebastien Falquier 
sebastien.falqu...@teads.tv wrote:

 Hi guys,

 I am new to Kafka and I am facing a problem I am not able to sort out.

 To smooth traffic over all my brokers' partitions, I have coded a custom
 Paritioner for my producers, using a simple round robin algorithm that
 jumps from a partition to another on every batch of messages (corresponding
 to batch.num.messages value). It looks like that :
 https://gist.github.com/sfalquier/4c0c7f36dd96d642b416

 With that fix, every partitions are used equally, but the amount of
 requests from the producers to the brokers have been multiplied by 2. I do
 not understand since all producers are async with batch.num.messages=200
 and the amount of messages processed is still the same as before. Why do
 producers need more requests to do the job? As internal traffic is a bit
 critical on our platform, I would really like to reduce producers' requests
 volume if possible.

 Any idea? Any suggestion?

 Regards,
 Sébastien



Re: Cascading failures on running out of disk space

2015-06-01 Thread Jason Rosenberg
Hi Jananee,

Do you for sure that you ran out of disk space completely? Did you see an
IOExceptions failing to write?  Normally, when that happens, the broker is
supposed to immediately shut itself down.  Did the one broker shut itself
down?

The NotLeaderForPartitionException's are normal when partition leadership
changes, and clients don't yet know about it.  They usually discover a
leadership change by getting this failure, and then re-checking the
partition metadata.  But, this metadata request can also fail in certain
conditions, which result in repeated NotLeaderForPartitionExceptions..

I've seen consumer offsets get reset too, if/when there's an unclean leader
election.  E.g. if the leader goes down hard without the followers up to
date (perhaps this happened in this case, if the leader was on the broker
with the full disk)?  I'm not sure why the consumer offsets have to be
completely reset, but that's what I've seen too.

Probably the most important thing to know, is that you don't want to let
your disks fill up, so if you can add early warning/monitoring so you can
take action before that happens, you'd avoid these scenarios with unclean
leader election, etc.

Jason

On Wed, May 27, 2015 at 10:54 AM, Jananee S janane...@gmail.com wrote:

 Hi,

   We have the following setup -

 Number of brokers: 3
 Number of zookeepers: 3
 Default replication factor: 3
 Offets Storage: kafka

 When one of our brokers ran out of disk space, we started seeing lot of
 errors in the broker logs at an alarming rate. This caused the other
 brokers also to run out of disk space.

 ERROR [ReplicaFetcherThread-0-101813211], Error for partition [,47] to
 broker 101813211:class kafka.common.UnknownException
 (kafka.server.ReplicaFetcherThread)

 WARN [Replica Manager on Broker 101813211]: Fetch request with correlation
 id 161672 from client ReplicaFetcherThread-0-101813211 on partition
 [,11] failed due to Leader not local for partition [,11] on broker
 101813211 (kafka.server.ReplicaManager)

 We also noticed NotLeaderForPartitionException in the producer and consumer
 logs (also at alarming rate)

 ERROR [2015-05-27 09:54:48,613] kafka.consumer.ConsumerFetcherThread: [
 ConsumerFetcherThread-_prod2-1432719772385-bd7608b8-0-101813211], Error
 for partition [,1] to broker 101813211:class kafka.common.
 NotLeaderForPartitionException

 The __consumer_offsets topic somehow got corrupted and consumers started
 consuming already consumed messages on restart.

 We deleted the offending topic and tried restarting the brokers and
 zookeepers. Now we are getting lots of corrupt index errors on broker start
 up.

 Was all this due to the replication factor being the same as number of
 brokers? Why would the topic files get corrupted in such a scenario?
 Please let us know how to recover from this scenario. Also, how do we turn
 down the error logging rate?

 Thanks,
 Jananee



Re: Broker error: failed due to Leader not local for partition

2015-06-01 Thread Jason Rosenberg
failed due to leader not local for partition usually occurs in response
to client requests that make a fetch or produce request to a partition, to
the wrong broker (e.g. to a follower and not the leader for the
partition).  Clients need to make a meta-data request first to determine
the leader replica for a partition, etc.  If you clients that are not well
behaved (e.g. they should re-poll the partition meta-data on error), then
you get these exceptions repeatedly.  However, it's normal to see these
messages occasionally, as it's the primary way that clients get notified of
a partition leadership change.

Jason

On Fri, May 29, 2015 at 3:59 PM, Casey Sybrandy - US csybra...@caci.com
wrote:

 Hello,


 For some reason, I'm getting this failed due to Leader not local for
 partition on my brokers.  I can't send the logs, but I'm going to try to
 describe what I see.  First, note that this is a fresh set of brokers with
 no data.


 I do see a series of messages stating Expanding ISR for partition, so it
 looks like all of the partitions have enough replicas.  I set up two
 partitions for each topic and three replicas each.


 I see several Removed fetcher for partitions and Added fetcher for
 partitions List messages, which looks right as it removes then adds the
 partition.  However, the first one I see is a pair of those messages with
 no partitions listed.  Right after them I see a Removed fetcher...
 message with all of the topics/partitions listed, but no corresponding
 Added fetcher message.


 That's about it that I see right now.  I know there was a bug a couple
 years ago in an early 0.8 release, but I'm running 0.8.2, so I shouldn't
 have that issue.  Googleing just didn't seem to get me any useful
 information.


 Thanks.


 Casey



Re: Kafka broker - Ip-address instead of host naem

2015-06-01 Thread Jason Rosenberg
Daniel,

Are you sure about that (it's not what I would have understood).

Generally, the way to do it is use a round-robin dns entry, which returns
successive nodes for successive requests.  Kafka will retry a zookeeper
request on failure (and in the process get re-routed to a new zk node).  If
a zk node fails, and a new one comes online, then it is necessary to update
the dns config of course.

If you don't have dns, then you can list your nodes explicitly, but you
won't be able to discover new nodes in the cluster if they are added (but
kafka will retry if it tries the bad one until it finds a good one).

Jason

On Mon, May 25, 2015 at 1:51 AM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 Oh ok. Got it. Thanks Gwen and Daniel.

 On Mon, May 25, 2015 at 5:15 AM, Daniel Compton 
 daniel.compton.li...@gmail.com wrote:

  Correct me if I'm wrong, but I thought that the zk connect config was
 only
  needed explicitly at startup, and the zk cluster would update the active
  zookeepers as they joined and exited. You only need to specify one zk to
  join the cluster, and it will bootstrap the rest.
 
  So zk changes won't require a restart, but you'll want to make sure your
  config is up to date when you do eventually come to do a restart.
  On Mon, 25 May 2015 at 1:44 am Achanta Vamsi Subhash 
  achanta.va...@flipkart.com wrote:
 
   Ok. I was talking about a scenario where there is no DNS/hostNames for
  the
   zk nodes.
  
   If the connection string is given with all ip addresess of zk hosts
 and a
   new host is brought up in the zk cluster replacing a old node with a
   different ip address, we still need to re-load the zk connection
 string.
  
   If hostnames are used instead, the DNS mapping can point to the new IP
  but
   in a scenario where there is no DNS, we need to again hard-code the IP
   address in the zk connection string and restart the broker.
  
   One way is to give the local mapping of the zk local hostname to ip in
   /etc/hosts file and change it to the new ip when the node changes. But
   would reload of the Kafka config with new zk nodes be a better option?
  But
   as you said, if we cannot reload the server.properties file, what is
 the
   best way in case of no service discovery?
  
  
   On Sun, May 24, 2015 at 6:52 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
You can't dynamically re-load server properties.
   
However, the norm in zookeeper is to configure the connection string
  with
all the nodes in the zk cluster, so there will be no need to modify
properties when you replace zk nodes.
   
On Sun, May 24, 2015 at 4:13 PM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:
   
 Thanks Gwen.

 One more question:
 Is there a way we can dynamically re-load the contents of the Kafka
 server.properties contents without restarting the broker? Example
use-case
 is when a zk-node goes down and a new one is brought up, we need to
update
 the server.properties file to reflect this. Currently there is no
 way
   to
do
 this other than broker restart. Broker restart requires a lot of
  things
to
 do before triggering it.

 This JIRA is already filed but un-resolved. We don't require all
 the
 configs to be reloaded but only variable external config changes
  should
be
 allowed.

 https://issues.apache.org/jira/browse/KAFKA-1229

 On Sun, May 24, 2015 at 1:14 PM, Gwen Shapira 
 gshap...@cloudera.com
  
 wrote:

  If you set advertised.hostname in server.properties to the ip
   address,
 the
  IP will be registered in ZooKeeper.
 
 
  On Fri, May 22, 2015 at 2:20 PM, Achanta Vamsi Subhash 
  achanta.va...@flipkart.com wrote:
 
   Hi,
  
   Currently Kakfa brokers register the hostname in zookeeper.
  
   [zk: localhost:2181(CONNECTED) 5] get
  /varadhi/kafka/brokers/ids/0
  
  
 

   
  
 
 {jmx_port:,timestamp:1427704934158,host:currHostName,version:1,port:9092}
  
   ​Is there any config to make it use ip-address instead so that
 we
don't
   make a DNS lookup for the hostname?
  
   --
   Regards
   Vamsi Subhash
  
 



 --
 Regards
 Vamsi Subhash

   
  
  
  
   --
   Regards
   Vamsi Subhash
  
 



 --
 Regards
 Vamsi Subhash



Re: leader update partitions fail with KeeperErrorCode = BadVersion,kafka version=0.8.1.1

2015-06-01 Thread Jason Rosenberg
I've seen this problem now too with 0.8.2.1.  It happened after we had a
disk failure (but the server failed to shutdown:  KAFKA-).  After that
happened, subsequently, several ISR's underwent I think 'unclean leader
election', but I'm not 100% sure. But I did see lots of those same error
messages: Cached zkVersion [X] not equal to that in zookeeper, skip
updating ISR

So, I don't know that the issue was fixed in 0.8.1.1.

Can you describe the circumstances for the errors you saw?

Jason

On Fri, May 29, 2015 at 12:17 AM, chenlax lax...@hotmail.com wrote:

 kafka version =0.8.1.1

  i get the error log as follow:

 INFO Partition [Topic_Beacon_1,10] on broker 4: Shrinking ISR for
 partition [Topic_Beacon_1,10] from 4,7 to 4 (kafka.cluster.Partition)
 ERROR Conditional update of path
 /brokers/topics/Topic_Beacon_1/partitions/10/state with data
 {controller_epoch:16,leader:4,version:1,leader_epoch:7,isr:[4]}
 and expecte
 d version 5032 failed due to
 org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
 BadVersion for /brokers/topics/Topic_Beacon_1/partitions/10/state
 (kafka.utils.ZkUtils$)
 INFO Partition [Topic_Beacon_1,10] on broker 4: Cached zkVersion [5032]
 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)


 Only restart broker,the issue will fix, why the broker can not back to ISR
 for partition?

 and i find the same issue,
 http://mail-archives.apache.org/mod_mbox/kafka-users/201404.mbox/%3CCAHwHRrW_vKsSpoAnhEqQUZVBT5_Djx3qbixsH8=6hAe=vg4...@mail.gmail.com%3E

 it point out kafka_0.8.1.1 fix the bug,so i want to know, what causes this
 problem.


 Thanks,
 Lax



Re: aborting a repartition assignment

2015-06-01 Thread Jason Rosenberg
As Lance mentioned, the best course of action in such a case (since version
0.8.X) is to keep the failed broker down, and bring up a new node (with the
same broker id as the failed broker), and it will automatically re-sync its
replicas (which may take some time).  You don't want to try to reassign
partitions, because, as you say, the one broker is failed and so that won't
succeed.

Jason

On Thu, May 28, 2015 at 7:10 PM, Foo Lim foo@vungle.com wrote:

 Thx for the reply. Can't I just reassign the partition to the new broker in
 zookeeper manually? What zookeeper path should I change?

 TIA


 On Thursday, May 28, 2015, Lance Laursen llaur...@rubiconproject.com
 wrote:

  Hey,
 
  Try clearing out /admin/reassign_partitions on your zookeeper.
 
  Additionally, your best bet might be to bring up a new broker with the
 same
  broker ID as your failed broker. It'll join the cluster and carry on,
  though I'm not sure what effect having a now-empty partition is going to
  have.
 
  On Thu, May 28, 2015 at 12:38 PM, Foo Lim foo@vungle.com
  javascript:; wrote:
 
   Hi,
  
   I'm using kafka 0.8.2  one of my kafka servers died (no way to recover
  the
   data on the disks). There is a topic, with replication of 1, with one
 of
   the partitions on the dead server. I thought a reassignment would move
  the
   metadata for that partition to a new server without needing the data,
 but
   the reassignment is stuck to in progress.
  
   I ran:
  
   $ /opt/kafka/kafka/bin/kafka-reassign-partitions.sh --zookeeper
   myzookeeper.my.com --reassignment-json-file new_assignment.json
 --verify
   Status of partition reassignment:
   Reassignment of partition [topicX,1] is still in progress
   This will never succeed since the dead server is never coming back.
  
   In the new server's log, I saw:
  
   [2015-05-28 06:25:15,401] INFO Completed load of log topicX-1 with log
  end
   offset 0 (kafka.log.Log)
   [2015-05-28 06:25:15,402] INFO Created log for partition [topicX,1] in
   /mnt2/data/kafka with properties {segment.index.bytes - 10485760,
   file.delete.delay.ms - 6, segment.bytes - 536870912, flush.ms -
   9223372036854775807, delete.retention.ms - 8640,
  index.interval.bytes
   - 4096, retention.bytes - -1, min.insync.replicas - 1,
 cleanup.policy
  -
   delete, unclean.leader.election.enable - true, segment.ms -
 60480,
   max.message.bytes - 112, flush.messages - 9223372036854775807,
   min.cleanable.dirty.ratio - 0.5, retention.ms - 25920,
   segment.jitter.ms - 0}. (kafka.log.LogManager)
   [2015-05-28 06:25:15,403] WARN Partition [topicX,1] on broker 4151132:
 No
   checkpointed highwatermark is found for partition [topicX,1]
   (kafka.cluster.Partition)
   [2015-05-28 06:25:15,405] INFO [ReplicaFetcherManager on broker
 4151132]
   Removed fetcher for partitions  (kafka.server.ReplicaFetcherManager)
   [2015-05-28 06:25:15,408] INFO [ReplicaFetcherManager on broker
 4151132]
   Added fetcher for partitions List()
 (kafka.server.ReplicaFetcherManager)
   [2015-05-28 06:25:15,411] INFO [ReplicaFetcherManager on broker
 4151132]
   Removed fetcher for partitions  (kafka.server.ReplicaFetcherManager)
   [2015-05-28 06:25:15,413] INFO [ReplicaFetcherManager on broker
 4151132]
   Added fetcher for partitions List()
 (kafka.server.ReplicaFetcherManager)
   Is there a way to force it to complete or abort the reassignment
 action?
  
  
  
 
 http://stackoverflow.com/questions/3054/aborting-kafka-reassign-partition-action
  
   Thanks!
  
 



Re: Ordered Message Queue with Pool of Consumers

2015-06-01 Thread Jason Rosenberg
How would you apply total ordering if multiple messages are being consumed
in parallel?   If message-1 and message-2 are being consumed in parallel,
do you really mean you want to guarantee that message-1 is consumed before
the consumption of message-2 begins?

On Tue, May 26, 2015 at 1:34 PM, Kumar Jayanti kumar.jaya...@gmail.com
wrote:

 Thanks for your response.

  The requirement is to have total ordering across partitions.  I do have a
 Key and all my consumers would be in the same consumer group.  I am new to
 kafka and so  would like to reconfirm my understanding of  your last
 statement :

 So assuming you have a key you can assign to organize them, you should be
 able to use many topic partitions and consumers.

  Let me  explore a bit more on your suggestion and come back if i have
 further doubts.

 thanks.







 On Tue, May 26, 2015 at 10:42 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Messages are consumed in order from each topic partition. You can make
 sure
  messages end up in the same topic partition by making them have the same
  key, or by explicitly mapping them to partitions yourself when producing
  them.
 
  To parallelize consumption, you use a consumer group. Each time you
 create
  a consumer, you can specify the group it should be in and work is
  automatically balanced across all the members of the group. This is done
 on
  a per topic-partition basis, so when the consumer group membership is
  stable, a single consumer will see all the messages from a given topic
  partition in order.
 
  So assuming you have a key you can assign to organize them, you should be
  able to use many topic partitions and consumers.
 
  On Tue, May 26, 2015 at 9:51 AM, Kumar Jayanti kumar.jaya...@gmail.com
  wrote:
 
Our system has a specific message type that requires total ordering.
   Message-1 should be consumed (acted upon) before Message-2 is consumed.
   However since we have a cluster of  consumer nodes we would like to
   round-robin the processing of messages if possible to different nodes
  while
   maintaining the ordering.  Is this possible ?
  
   From my reading it appears we have to pin it down to  a Topic with
 Single
   Partition with Single Consumer Process but i may be wrong ?.
  
 
 
 
  --
  Thanks,
  Ewen
 



Re: consumer poll returns no records unless called more than once, why?

2015-06-01 Thread Jason Rosenberg
Ben,

It could also be related to how you initialize auto.offset.reset.  In unit
tests, you generally want to set it to 'smallest' to avoid race conditions
between producing and consuming.

Jason

On Wed, May 20, 2015 at 2:32 PM, Padgett, Ben bpadg...@illumina.com wrote:

 Thanks for the detailed explanation. I was simply testing Kafka for the
 first time with a few throw away unit tests to learn it works and was
 curious why I was receiving that behavior.
 
 From: Jay Kreps [jay.kr...@gmail.com]
 Sent: Wednesday, May 20, 2015 10:29 AM
 To: users@kafka.apache.org
 Subject: Re: consumer poll returns no records unless called more than
 once, why?

 Hey Ben,

 The consumer actually doesn't promise to return records on any given poll()
 call and even in trunk it won't return records on the first call likely.

 Internally the reason is that it basically does one or two rounds of
 non-blocking actions and then returns. This could include things like
 communicating with the co-ordinator, establishing connections, sending
 fetch requests, etc.

 I guess the question is whether this behavior is confusing or not. In
 general there is no guarantee that you will have data ready, or that if you
 do you will be assigned a partition to consume from within your timeout. So
 assuming that poll will always return data is wrong.

 However with a little effort we could potentially wrap the poll call so
 that rather than return it would always attempt to wait the full timeout
 potentially doing multiple internal polls. This doesn't guarantee it would
 return data but would reduce the likelihood when data was ready.

 I'm not sure if that is actually a good idea vs just documenting this a
 little better in the javadoc.

 -Jay

 On Wed, May 20, 2015 at 10:12 AM, Guozhang Wang wangg...@gmail.com
 wrote:

  Hello Ben,
 
  This Java consumer client was still not mature in 0.8.2.0 and lots of bug
  fixes have been checked in since then.
 
  I just test your code with trunk's consumer and it does not illustrate
 this
  problem. Could you try the same on your side and see if this issue goes
  away?
 
  Guozhang
 
  On Wed, May 20, 2015 at 9:49 AM, Padgett, Ben bpadg...@illumina.com
  wrote:
 
   I am using Kafka v0.8.2.0
   
   From: Guozhang Wang [wangg...@gmail.com]
   Sent: Wednesday, May 20, 2015 9:41 AM
   To: users@kafka.apache.org
   Subject: Re: consumer poll returns no records unless called more than
   once, why?
  
   Hello Ben,
  
   Which version of Kafka are you using with this consumer client?
  
   Guozhang
  
   On Wed, May 20, 2015 at 9:03 AM, Padgett, Ben bpadg...@illumina.com
   wrote:
  
//this code
   
Properties consumerProps = new Properties();
consumerProps.put(bootstrap.servers, localhost:9092);
   
   
//without deserializer it fails, which makes sense. the
documentation however doesn't show this
consumerProps.put(key.deserializer,
org.apache.kafka.common.serialization.StringDeserializer);
consumerProps.put(value.deserializer,
org.apache.kafka.common.serialization.StringDeserializer);
   
   
//why is serializer required? without this it fails to return
results when calling poll
consumerProps.put(key.serializer,
org.apache.kafka.common.serializers.StringSerializer);
consumerProps.put(value.serializer,
org.apache.kafka.common.serializers.StringSerializer);
   
   
consumerProps.put(group.id, test);
consumerProps.put(enable.auto.commit, true);
consumerProps.put(auto.commit.interval.ms, 1000);
consumerProps.put(session.timeout.ms, 3);
   
org.apache.kafka.clients.consumer.KafkaConsumerString,
 String
consumer = new
 org.apache.kafka.clients.consumer.KafkaConsumerString,
String(consumerProps);
   
TopicPartition topicPartition = new
TopicPartition(project-created, 0);
consumer.subscribe(topicPartition);
   
consumer.seeekToBeginning(topicPartition);
   
//each scenerio code goes here
   
   
   
I have a scenerio where it returns records and a scenerio
 where
   no
records are returned. Could anyone provide insight on why this
 happens?
   
   
   
//without a loop consumer.poll(100) returns no records
//after poll is called a second time it returns records
boolean run = true;
while (run) {
ConsumerRecordsString, String records =
  consumer.poll(100);
}
   
   
   
//why would this return zero records?
ConsumerRecordsString, String records = consumer.poll(100);
   
   
//This is to show that there are records for topic project-created
   
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
project-created --from-beginning

Re: KafkaException: Size of FileMessageSet has been truncated during write

2015-06-01 Thread Jason Rosenberg
Might be good to have a more friendly error message though!

On Thu, May 28, 2015 at 4:32 PM, Andrey Yegorov andrey.yego...@gmail.com
wrote:

 Thank you!

 --
 Andrey Yegorov

 On Wed, May 27, 2015 at 4:42 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  This should be just a message fetch failure. The socket was disconnected
  when broker was writing to it. There should not be data loss.
 
  Jiangjie (Becket) Qin
 
  On 5/27/15, 11:00 AM, Andrey Yegorov andrey.yego...@gmail.com wrote:
 
  I've noticed a few exceptions in the logs like the one below, does it
  indicate data loss? should I worry about this?
  What is the possible reason for this to happen?
  I am using kafka 0.8.1.1
  
  ERROR Closing socket for /xx.xxx.xxx.xxx because of error
  (kafka.network.Processor)
  
  kafka.common.KafkaException: Size of FileMessageSet
  /data/kafka/topic-name-11/14340499.log has been truncated
  during write: old size 26935, new size 0
  
  at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
  
  at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
  
  at kafka.network.MultiSend.writeTo(Transmission.scala:102)
  
  at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
  
  at kafka.network.MultiSend.writeTo(Transmission.scala:102)
  
  at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
  
  at kafka.network.Processor.write(SocketServer.scala:375)
  
  at kafka.network.Processor.run(SocketServer.scala:247)
  
  at java.lang.Thread.run(Thread.java:745)
  
  --
  Andrey Yegorov
 
 



Re: Kafka partitions unbalanced

2015-06-01 Thread Jason Rosenberg
Andrew Otto,

This is a known problem (and which I have run into as well).  Generally, my
solution has been to increase the number of partitions such that the
granularity of partitions is much higher than the number of disks, such
that its more unlikely for the imbalance to be significant.

I would not recommend explicitly trying to game the system, by manually
moving partitions and recovery files.  You could do something to cause it
to recreate the replicas by having them recreated from scratch (e.g. use
the partition reassignment tool to move it to a new broker and hope for a
cleaner distribution).  Also, I've removed a log-dir from the 'log.dirs'
list and restarted a broker when dealing with a failed disk (this will
cause any data on the removed log.dir to be reassigned elsewhere, and the
data will have to re-sync from replicas to fully recover).

There is a 'KIP' about this issue, to make JBOD support in Kafka a bit more
first-class, and I think this would be one of the main issues to solve.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-18+-+JBOD+Support

Jason

On Wed, May 27, 2015 at 5:55 PM, Jonathan Creasy jonathan.cre...@turn.com
wrote:

 I have a similar issue, let me know how it goes. :)

 -Original Message-
 From: Andrew Otto [mailto:ao...@wikimedia.org]
 Sent: Wednesday, May 27, 2015 3:12 PM
 To: users@kafka.apache.org
 Subject: Kafka partitions unbalanced

 Hi all,

 I’ve recently noticed that our broker log.dirs are using up different
 amounts of storage.  We use JBOD for our brokers, with 12 log.dirs, 1 on
 each disk.  One of our topics is larger than the others, and has 12
 partitions.  Replication factor is 3, and we have 4 brokers.  Each broker
 then has to store 9 partitions for this topic (12*3/4 == 9).

 I guess I had originally assumed that Kafka would be smart enough to
 spread partitions for a given topic across each of the log.dirs as evenly
 as it could.  However, on some brokers this one topic has 2 partitions in a
 single log.dir, meaning that the storage taken up on a single disk by this
 topic on those brokers is twice what it should be.

 e.g.

 Filesystem  Size  Used Avail Use% Mounted on
 /dev/sda3   1.8T  1.2T  622G  66% /var/spool/kafka/a
 /dev/sdb3   1.8T  1.7T  134G  93% /var/spool/kafka/b
 …
 $ du -sh /var/spool/kafka/{a,b}/data/webrequest_upload-*
 501Ga/data/webrequest_upload-4
 500Gb/data/webrequest_upload-11
 501Gb/data/webrequest_upload-8


 This also means that those over populated disks have more writes to do.
 My I/O is imbalanced!

 This is sort of documented at http://kafka.apache.org/documentation.html 
 http://kafka.apache.org/documentation.html:

 If you configure multiple data directories partitions will be assigned
 round-robin to data directories. Each partition will be entirely in one of
 the data directories. If data is not well balanced among partitions this
 can lead to load imbalance between disks.”

 But my data is well balanced among partitions!  It’s just that multiple
 partitions are assigned to a single disk.

 Anyyway, on to a question:  Is it possible to move partitions between
 log.dirs?  Is there tooling to do so?  Poking around in there, it looks
 like it might be as simple as shutting down the broker, moving the
 partition directory, and then editing both replication-offset-checkpoint
 and recovery-point-offset-checkpoint files so that they say the appropriate
 things in the appropriate directories, and then restarting broker.

 Someone tell me that this is a horrible idea. :)

 -Ao





Re: Offset management: client vs broker side responsibility

2015-06-01 Thread Jason Rosenberg
Stevo,

Both of the main solutions used by the high-level consumer are standardized
and supported directly by the kafka client libraries (e.g. maintaining
offsets in zookeeper or in kafka itself).  And for the zk case, there is
the consumer offset checker (which is good for monitoring).  Consumer
offset checker still needs to be extended for offsets stored in kafka
_consumer_offset topics though.

Anyway, I'm not sure I understand your question, you want something for
better monitoring of all possible clients (some of which might choose to
manage offsets in their own way)?

It's just not part of the kafka design to directly track individual
consumers.

Jason

On Wed, May 27, 2015 at 7:42 AM, Shady Xu shad...@gmail.com wrote:

 I guess adding a new component will increase the complexity of the system
 structure. And if the new component consists of one or a few nodes, it may
 becomes the bottleneck of the whole system, if it consists of many nodes,
 it will make the system even more complex.

 Although every solution has its downsides, I think the current one is
 decent.

 2015-05-27 17:10 GMT+08:00 Stevo Slavić ssla...@gmail.com:

  It could be a separate server component, does not have to be
  monolith/coupled with broker.
  Such solution would have benefits - single API, pluggable
 implementations.
 
  On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote:
 
   Storing and managing offsets by broker will leave high pressure on the
   brokers which will affect the performance of the cluster.
  
   You can use the advanced consumer APIs, then you can get the offsets
  either
   from zookeeper or the __consumer_offsets__ topic. On the other hand, if
  you
   use the simple consumer APIs, you mean to manage offsets yourself, then
  you
   should monitor them yourself, simple and plain, right?
  
   2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com:
  
Hello Apache Kafka community,
   
Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset
management responsibility is mainly client/consumer side
  responsibility.
   
Wouldn't it be better if it was broker side only responsibility?
   
E.g. now if one wants to use custom offset management, any of the
 Kafka
monitoring tools cannot see the offsets - they would need to use same
custom client implementation which is practically not possible.
   
Kind regards,
Stevo Slavic.
   
  
 



Re: How to achieve distributed processing and high availability simultaneously in Kafka?

2015-05-06 Thread Jason Rosenberg
A consumer thread can consume multiple partitions.  This is not unusual, in
practice.

In the example you gave, if multiple high-level consumers are using the
same group id, they will automatically rebalance the partition assignment
between them as consumers dynamically join and leave the group.  So, in
your example, if process 1 dies, then process 2 will assume ownership for
all the n partitions (and if it has n/2 threads, each thread will own 2 of
the partitions).

In my experience though, its generally fine to have fewer threads than
partitions.  It depends on the volume of data incoming to each partition of
course, and how fast the consumer takes to process each message.

Jason

On Wed, May 6, 2015 at 1:57 AM, sumit jain sumitjai...@gmail.com wrote:

 I have a topic consisting of n partitions. To have distributed processing I
 create two processes running on different machines. They subscribe to the
 topic with same groupd id and allocate n/2 threads, each of which processes
 single stream(n/2 partitions per process).

 With this I will have achieved load distribution, but now if process 1
 crashes, than process 2 cannot consume messages from partitions allocated
 to process 1, as it listened only on n/2 streams at the start.

 Or else, if I configure for HA and start n threads/streams on both
 processes, then when one node fails, all partitions will be processed by
 other node. But here, we have compromised distribution, as all partitions
 will be processed by a single node at a time.

 Is there a way to achieve both simultaneously and how?
 Note: Already asked this on stackoverflow

 http://stackoverflow.com/questions/30060261/how-to-achieve-distributed-processing-and-high-availability-simultaneously-in-ka
 .
 --
 Thanks  Regards,
 Sumit Jain



Re: circuit breaker for producer

2015-05-05 Thread Jason Rosenberg
Guozhang,

Do you have the ticket number for possibly adding in local log file
failover? Is it actively being worked on?

Thanks,

Jason

On Tue, May 5, 2015 at 6:11 PM, Guozhang Wang wangg...@gmail.com wrote:

 Does this log file acts as a temporary disk buffer when broker slows
 down, whose data will be re-send to broker later, or do you plan to use it
 as a separate persistent storage as Kafka brokers?

 For the former use case, I think there is an open ticket for integrating
 this kind of functionality into producer; for the latter use case, you may
 want to do this traffic control out of Kafka producer, i.e. upon detecting
 producer buffer full, do not call send() on it for a while but write to a
 different file, etc.

 Guozhang

 On Tue, May 5, 2015 at 11:28 AM, mete efk...@gmail.com wrote:

  Sure, i kind of count on that actually, i guess with this setting the
  sender blocks on allocate method and this bufferpool-wait-ratio
 increases.
 
  I want to fully compartmentalize the kafka producer from the rest of the
  system. Ex: writing to a log file instead of trying to send to kafka when
  some metric in the producer indicates that there is a performance
  degradation or some other problem.
  I was wondering what would be the ideal way of deciding that?
 
 
 
  On Tue, May 5, 2015 at 6:32 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Does block.on.buffer.full=false do what you want?
  
   -Jay
  
   On Tue, May 5, 2015 at 1:59 AM, mete efk...@gmail.com wrote:
  
Hello Folks,
   
I was looking through the kafka.producer metrics on the JMX
 interface,
  to
find a good indicator when to trip the circuit. So far it seems
 like
   the
bufferpool-wait-ratio metric is a useful decision mechanism when to
  cut
off the production to kafka.
   
As far as i experienced, when kafka server slow for some reason,
  requests
start piling up on the producer queue and if you are not willing to
  drop
any messages on the producer, send method starts blocking because of
  the
slow responsiveness.
   
So this buffer pool wait ratio starts going up from 0.x up to 1.0.
 And
  i
   am
thinking about tripping the circuit breaker using this metric, ex: if
wait-ratio  0.90 etc...
   
What do you think? Do you think there would be a better indicator to
   check
the health overall?
   
Best
Mete
   
  
 



 --
 -- Guozhang



Re: 'roundrobin' partition assignment strategy restrictions

2015-05-05 Thread Jason Rosenberg
I filed this jira, fwiw:  https://issues.apache.org/jira/browse/KAFKA-2172

Jason

On Mon, Mar 23, 2015 at 2:44 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Jason,

 Yes, I agree the restriction makes the usage of round-robin less flexible.
 I think the focus of round-robin strategy is workload balance. If
 different consumers are consuming from different topics, it is unbalanced
 by nature. In that case, is it possible that you use different consumer
 group for different sets of topics?
 The rolling update is a good point. If you do rolling bounce in a small
 window, the rebalance retry should handle it. But if you want to canary a
 new topic setting on one consumer for some time, it won’t work.
 Could you maybe share the use case with more detail? So we can see if
 there is any workaround.

 Jiangjie (Becket) Qin

 On 3/22/15, 10:04 AM, Jason Rosenberg j...@squareup.com wrote:

 Jiangjie,
 
 Yeah, I welcome the round-robin strategy, as the 'range' strategy ('til
 now
 the only one available), is not always good at balancing partitions, as
 you
 observed above.
 
 The main thing I'm bringing up in this thread though is the question of
 why
 there needs to be a restriction to having a homogenous set of consumers in
 the group being balanced.  This is not a requirement for the range
 algorithm, but is for the roundrobin algorithm.  So, I'm just wanting to
 understand why there's that limitation.  (And sadly, in our case, we do
 have heterogenous consumers using the same groupid, so we can't easily
 turn
 on roundrobin at the moment, without some effort :) ).
 
 I can see that it does simplify the implementation to have that
 limitation,
 but I'm just wondering if there's anything fundamental that would prevent
 an implementation that works over heterogenous consumers.  E.g. Lay out
 all partitions, and layout all consumer threads, and proceed round robin
 assigning each partition to the next consumer thread. *If the next
 consumer
 thread doesn't have a selection for the current partition, then move on to
 the next consumer-thread*
 
 The current implementation is also problematic if you are doing a rolling
 restart of a consumer cluster.  Let's say you are updating the topic
 selection as part of an update to the cluster.  Once the first node is
 updated, the entire cluster will no longer be homogenous until the last
 node is updated, which means you will have a temporary outage consuming
 data until all nodes have been updated.  So, it makes it difficult to do
 rolling restarts, or canary updates on a subset of nodes, etc.
 
 Jason
 
 Jason
 
 On Fri, Mar 20, 2015 at 10:15 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Hi Jason,
 
  The motivation behind round robin is to better balance the consumers¹
  load. Imagine you have two topics each with two partitions. These topics
  are consumed by two consumers each with two consumer threads.
 
  The range assignment gives:
  T1-P1 - C1-Thr1
  T1-P2 - C1-Thr2
  T2-P1 - C1-Thr1
  T2-P2 - C1-Thr2
  Consumer 2 will not be consuming from any partitions.
 
  The round robin algorithm gives:
  T1-P1 - C1-Thr1
  T1-P2 - C1-Thr2
  T2-P1 - C2-Thr1
  T2-p2 - C2-Thr2
  It is much better than range assignment.
 
  That¹s the reason why we introduced round robin strategy even though it
  has restrictions.
 
  Jiangjie (Becket) Qin
 
 
  On 3/20/15, 12:20 PM, Jason Rosenberg j...@squareup.com wrote:
 
  Jiangle,
  
  The error messages I got (and the config doc) do clearly state that the
  number of threads per consumer must match also
  
  I'm not convinced that an easy to understand algorithm would work fine
  with
  a heterogeneous set of selected topics between consumers.
  
  Jason
  
  On Thu, Mar 19, 2015 at 8:07 PM, Mayuresh Gharat
  gharatmayures...@gmail.com
   wrote:
  
   Hi Becket,
  
   Can you list down an example for this. It would be easier to
 understand
  :)
  
   Thanks,
  
   Mayuresh
  
   On Thu, Mar 19, 2015 at 4:46 PM, Jiangjie Qin
  j...@linkedin.com.invalid
   wrote:
  
Hi Jason,
   
The round-robin strategy first takes the partitions of all the
 topics
  a
consumer is consuming from, then distributed them across all the
   consumers.
If different consumers are consuming from different topics, the
  assigning
algorithm will generate different answers on different consumers.
It is OK for consumers to have different thread count, but the
  consumers
have to consume from the same set of topics.
   
   
For range strategy, the balance is for each individual topic
 instead
  of
cross topics. So the balance is only done for the consumers
 consuming
   from
the same topic.
   
Thanks.
   
Jiangjie (Becket) Qin
   
On 3/19/15, 4:14 PM, Jason Rosenberg j...@squareup.com wrote:
   
So,

I've run into an issue migrating a consumer to use the new
  'roundrobin'
partition.assignment.strategy.  It turns out that several of our
   consumers
use the same group id

Re: Round Robin Partition Assignment

2015-05-05 Thread Jason Rosenberg
I asked about this same issue in a previous thread.  Thanks for reminding
me, I've added this Jira:  https://issues.apache.org/jira/browse/KAFKA-2172

I think this is a great new feature, but it's unfortunately the all
consumers must be the same is just a bit too restrictive.

Jason

On Tue, May 5, 2015 at 5:20 PM, Bryan Baugher bjb...@gmail.com wrote:

 Hi everyone,

 We recently switched to round robin partition assignment after we noticed
 that range partition assignment (default) will only make use of the first X
 consumers were X is the number of partitions for a topic our consumers are
 interested in. We then noticed the caveat in round robin,

 Round-robin assignment is permitted only if: (a) Every topic has the same
 number of streams within a consumer instance (b) The set of subscribed
 topics is identical for every consumer instance within the group.

 We tried this out and found if all consumers don't agree on topic
 subscription they basically stop consuming until things get figured out.
 This is bad for us since our consumers change their topic subscription
 based on config they load from a REST service periodically.

 Is there something we can do on our side to avoid this? The best thing I
 can think of is to try and use something like zookeeper to coordinate
 changing the topic filters.

 Would it be possible to see the round robin assignment updated to not
 require identical topic subscriptions?

 Bryan



expected behavior if a node undergoes unclean shutdown

2015-04-08 Thread Jason Rosenberg
Hello,

I'm still trying to get to the bottom of an issue we had previously, with
an unclean shutdown during an upgrade to 0.8.2.1 (from 0.8.1.1).  In that
case, the controlled shutdown was interrupted, and the node was shutdown
abruptly.  This resulted in about 5 minutes of unavailability for most
partitions.  (I think that issue is related to the one reported by Thunder
in the thread titled: Problem with node after restart no partitions?).

Anyway, while investigating that, I've gotten side-tracked, trying
understand what the expected behavior should be, if the controller node
dies abruptly.

To test this, I have a small test cluster (2 nodes, 100 partitions, each
with replication factor 2, using 0.8.2.1).  There are also a few test
producer clients, some of them high volume

I intentionally killed the controller node hard.  I noticed that for 10
seconds, the second node spammed the logs for 10 seconds trying to fetch
data for the partitions it was following on the node that was killed.
Finally, after about 10 seconds, the second node elected itself the new
controller, and things slowly recovered.

Clients could not successfully produce to the affected partitions until the
new controller was elected (and got failed meta-data requests trying to
discover the new leader partition).

I would have expected the cluster to recover more quickly if a node fails,
if we have available replicas that can become leader and start receiving
data.  With just 100 partitions, I would have expected this recovery to
happen very quickly.  (Whereas in our previous issue, where it seemed to
take 5 minutes, the longer duration there was probably related to a much
larger number of partitions).

Anyway, before I start filing Jira's and attaching log snippets, I'd like
to understand what the expected behavior should be?

If a controller (or really any node in the cluster) undergoes unclean
shutdown, how should the cluster respond, in keeping replicas available
(assuming all replicas were in ISR before the shutdown).  How fast should
controller and partition leader election happen in this case?

Thanks,

Jason


Re: expected behavior if a node undergoes unclean shutdown

2015-04-08 Thread Jason Rosenberg
I've confirmed that the same thing happens even if it's not the controller
that's killed hard.  Also, in several trials, it took between 10-30 seconds
to recover.

Jason

On Wed, Apr 8, 2015 at 1:31 PM, Jason Rosenberg j...@squareup.com wrote:

 Hello,

 I'm still trying to get to the bottom of an issue we had previously, with
 an unclean shutdown during an upgrade to 0.8.2.1 (from 0.8.1.1).  In that
 case, the controlled shutdown was interrupted, and the node was shutdown
 abruptly.  This resulted in about 5 minutes of unavailability for most
 partitions.  (I think that issue is related to the one reported by Thunder
 in the thread titled: Problem with node after restart no partitions?).

 Anyway, while investigating that, I've gotten side-tracked, trying
 understand what the expected behavior should be, if the controller node
 dies abruptly.

 To test this, I have a small test cluster (2 nodes, 100 partitions, each
 with replication factor 2, using 0.8.2.1).  There are also a few test
 producer clients, some of them high volume

 I intentionally killed the controller node hard.  I noticed that for 10
 seconds, the second node spammed the logs for 10 seconds trying to fetch
 data for the partitions it was following on the node that was killed.
 Finally, after about 10 seconds, the second node elected itself the new
 controller, and things slowly recovered.

 Clients could not successfully produce to the affected partitions until
 the new controller was elected (and got failed meta-data requests trying to
 discover the new leader partition).

 I would have expected the cluster to recover more quickly if a node fails,
 if we have available replicas that can become leader and start receiving
 data.  With just 100 partitions, I would have expected this recovery to
 happen very quickly.  (Whereas in our previous issue, where it seemed to
 take 5 minutes, the longer duration there was probably related to a much
 larger number of partitions).

 Anyway, before I start filing Jira's and attaching log snippets, I'd like
 to understand what the expected behavior should be?

 If a controller (or really any node in the cluster) undergoes unclean
 shutdown, how should the cluster respond, in keeping replicas available
 (assuming all replicas were in ISR before the shutdown).  How fast should
 controller and partition leader election happen in this case?

 Thanks,

 Jason



Re: Problem with node after restart no partitions?

2015-04-07 Thread Jason Rosenberg
:40,701] INFO Scheduling log segment 6594 for log
 xyz.topic1-2 for deletion. (kafka.log.Log)
 [2015-03-31 10:24:40,701] INFO Scheduling log segment 6595 for log
 xyz.topic1-2 for deletion. (kafka.log.Log)
 [2015-03-31 10:24:40,702] INFO Scheduling log segment 203953 for log
 xyz.topic2-4 for deletion. (kafka.log.Log)
 [2015-03-31 10:24:40,702] INFO Scheduling log segment 210571 for log
 xyz.topic2-4 for deletion. (kafka.log.Log)
 [2015-03-31 10:24:40,702] INFO Scheduling log segment 211471 for log
 xyz.topic2-4 for deletion. (kafka.log.Log)

 then it starts actually deleting them... this goes on for a good 20
 minutes...
  [2015-03-31 10:25:40,704] INFO Deleting segment 6587 from log
 xyz.topic1-2. (kafka.log.Log)
 [2015-03-31 10:25:40,716] INFO Deleting index
 /data4/kafka-data/xyz.topic1-2/6587.index.deleted
 (kafka.log.OffsetIndex)
 [2015-03-31 10:25:40,716] INFO Deleting segment 6594 from log
 xyz.topic1-2. (kafka.log.Log)
 [2015-03-31 10:25:40,717] INFO Deleting index
 /data4/kafka-data/xyz.topic1-2/6594.index.deleted
 (kafka.log.OffsetIndex)
 [2015-03-31 10:25:40,717] INFO Deleting segment 6595 from log
 xyz.topic1-2. (kafka.log.Log)
 [2015-03-31 10:25:40,717] INFO Deleting index
 /data4/kafka-data/xyz.topic1-2/6595.index.deleted
 (kafka.log.OffsetIndex)
 [2015-03-31 10:25:40,717] INFO Deleting segment 203953 from log
 xyz.topic2-4. (kafka.log.Log)
 [2015-03-31 10:25:40,722] INFO Deleting segment 210571 from log
 xyz.topic2-4. (kafka.log.Log)
 [2015-03-31 10:25:40,729] INFO Deleting index
 /data4/kafka-data/xyz.topic2-4/00203953.index.deleted
 (kafka.log.OffsetIndex)
 [2015-03-31 10:25:40,729] INFO Deleting segment 211471 from log
 xyz.topic2-4. (kafka.log.Log)

 I don't know that we knew what was happening exactly at this time, only
 that it was not sync'ing up with the others. I think the sys-engineer
 stopped it after about 20 minutes to see what was wrong... I think by this
 point the damage was done. And actually in these logs, I don't see any of
 the No checkpointed highwatermark is found for partition messages in this
 sequence though, not sure what's up.

 Any ideas? It makes us more than a little nervous to restart nodes if they
 will just decide to delete segments. Under what conditions would this
 happen?

 Thanks!
 Thunder


 -Original Message-
 From: Thunder Stumpges [mailto:tstump...@ntent.com]
 Sent: Friday, April 03, 2015 12:10 PM
 To: users@kafka.apache.org
 Subject: RE: Problem with node after restart no partitions?

 Likewise, I was not at the wheel when this was happening, and there very
 well could have been a similar situation of not waiting for a controlled
 shutdown to complete successfully.

 Fortunately we did not end up in exactly your situation where the entire
 cluster went down, but I can say I know we never had more than 1 of the
 three nodes shut down during this situation, and twice in two days we lost
 all data on the node and it had to re-sync ALL of its data over again.

 I am in the process of trying to pull the controller and server logs for
 the first portion of our issue right now. Will follow up when they are
 available.

 Thanks,
 Thunder


 -Original Message-
 From: Jason Rosenberg [mailto:j...@squareup.com]
 Sent: Friday, April 03, 2015 10:50 AM
 To: users@kafka.apache.org
 Subject: Re: Problem with node after restart no partitions?

 I will provide what I can (we don't have separate logs for controller,
 etc., it's all integrated in a single log with log4j for us, we embed Kafka
 in a java container, etc).  Are there specific log classes you'd be
 interested in seeing?  (I can look at the default log4j configs to see
 what's set up normally for the 'controller' log)

 We have most of the logs saved away (but not all of them).

 Sorry, I didn't mean to hijack this thread (so will start a new thread
 soonly).

 Jason

 On Fri, Apr 3, 2015 at 1:37 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  This sounds a very serious issueŠ Could you provide the controller log
  and the log for the first broker on which you tried controlled
  shutdown and upgrade?
 
  On 4/3/15, 8:57 AM, Jason Rosenberg j...@squareup.com wrote:
 
  I'm preparing a longer post here, but we recently ran into a similar
  scenario.  Not sure yet if it's the same thing you saw (but it feels
  similar).  We were also doing a rolling upgrade from 0.8.1.1 to
  0.8.2.1, and during the controlled shutdown of the first node (of a 4
  node cluster), the controlled shutdown was taking longer than normal
  (it timed out several times and was retrying controlled shutdown),
  and unfortunately, our deployment system decided to kill it hard (so
  it was in the middle of it's 4th controlled shutdown retry, etc.).
  
  Anyway, when the node came back, it naturally decided to 'restore'
  most of it's partitions, which took some time (but only like 5
  minutes).  What's weird is it didn't decide to resync data from other

Re: Problem with node after restart no partitions?

2015-04-03 Thread Jason Rosenberg
I will provide what I can (we don't have separate logs for controller,
etc., it's all integrated in a single log with log4j for us, we embed Kafka
in a java container, etc).  Are there specific log classes you'd be
interested in seeing?  (I can look at the default log4j configs to see
what's set up normally for the 'controller' log)

We have most of the logs saved away (but not all of them).

Sorry, I didn't mean to hijack this thread (so will start a new thread
soonly).

Jason

On Fri, Apr 3, 2015 at 1:37 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 This sounds a very serious issueŠ Could you provide the controller log and
 the log for the first broker on which you tried controlled shutdown and
 upgrade?

 On 4/3/15, 8:57 AM, Jason Rosenberg j...@squareup.com wrote:

 I'm preparing a longer post here, but we recently ran into a similar
 scenario.  Not sure yet if it's the same thing you saw (but it feels
 similar).  We were also doing a rolling upgrade from 0.8.1.1 to 0.8.2.1,
 and during the controlled shutdown of the first node (of a 4 node
 cluster),
 the controlled shutdown was taking longer than normal (it timed out
 several
 times and was retrying controlled shutdown), and unfortunately, our
 deployment system decided to kill it hard (so it was in the middle of it's
 4th controlled shutdown retry, etc.).
 
 Anyway, when the node came back, it naturally decided to 'restore' most of
 it's partitions, which took some time (but only like 5 minutes).  What's
 weird is it didn't decide to resync data from other replicas, instead it
 just restored partitions locally.  During this time, the rest of the
 cluster failed to elect any new leaders, and so for 5 minutes, those
 partitions were unavailable (and we saw a flood of failed FetcherManager
 exceptions from the other nodes in the cluster).  Most of the partitions
 were empty (e.g. there's no way the other replicas were behind and not in
 the ISR normally).  During this 5 minutes, producers were unable to send
 messages due to NotLeaderForPartition exceptions.  Apparently the
 controller was still sending them to the unavailable broker.
 
 Finally, when the first node finally came up, the other nodes were
 somewhat
 happy again (but a few partitions remained under-replicated indefinitely).
 Because of this, we decided to pause the rolling restart, and try to wait
 for the under-replicated partitions to get insync.  Unfortunately, about
 an
 hour later, the whole cluster went foobar (e.g. partitions became
 unavailable, brokers logged a flood of Fetcher errors, producers couldn't
 find a valid leader, metadata requests timed out, etc.).  In a panic, we
 reverted that first node back to 0.8.1.1. This did not help,
 unfortunately,
 so, deciding we'd already probably lost data at this point (and producers
 could not send data due to (NotLeaderForPartition exceptions)), we decided
 to just forcibly do the upgrade to 0.8.2.1.  This was all a bad situation,
 of course.
 
 So, now we have the cluster stable at 0.8.2.1, but like you, we are very,
 very nervous about doing any kind of restart to any of our nodes.  We lost
 data, primarily in the form of producers failing to send during the
 periods
 of unavailability.
 
 It looks like the root cause, in our case, was a flood of topics created
 (long-since unused and empty).  This appears to have caused the longer
 than
 normal controlled shutdown, which in turn, led to the followon problems.
 However, in the past, we've seen a controlled shutdown failure result in
 an
 unclean shutdown, but usually the cluster recovers (e.g. it elects new
 leaders, and when the new node comes back, it recovers it's partitions
 that
 were uncleanly shutdown).  That did not happen this time (the rest of the
 cluster got in an apparent infinite loop where it tried repeatedly (e.g.
 500K times a minute) to fetch partitions that were unavailable).
 
 I'm preparing a longer post with more detail (will take a bit of time).
 
 Jason
 
 On Thu, Apr 2, 2015 at 10:19 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  wow, thats scary for sure.
 
  Just to be clear - all you did is restart *one* broker in the cluster?
  everything else was ok before the restart? and that was controlled
  shutdown?
 
  Gwen
 
  On Wed, Apr 1, 2015 at 11:54 AM, Thunder Stumpges tstump...@ntent.com
  wrote:
 
   Well it appears we lost all the data on the one node again. It
 appears to
   be all or part of KAFKA-1647
   https://issues.apache.org/jira/browse/KAFKA-1647 as we saw this in
 our
   logs (for all topics):
  
   [2015-04-01 10:46:58,901] WARN Partition [logactivity-redirect,3] on
   broker 6: No checkpointed highwatermark is found for partition
   [logactivity-redirect,3] (kafka.cluster.Partition)
   [2015-04-01 10:46:58,902] WARN Partition [pageimpression,1] on broker
 6:
   No checkpointed highwatermark is found for partition
 [pageimpression,1]
   (kafka.cluster.Partition)
   [2015-04-01 10:46:58,904] WARN Partition

Re: How to consume from a specific topic, as well as a wildcard of topics?

2015-04-03 Thread Jason Rosenberg
Yeah, I think you need to have 2 consumer connectors (I routinely have
multiple consumer connectors co-existing in the same app).

That error message about the ephemeral node is really annoying, by the
way.  It happens under lots of scenarios (at least it did under 0.8.1.1),
where it simply never recovers, until you bounce the app.  In our case it
seemed to happen after a rebalance failure, etc.  The text of the message
is also really annoying (overly flippant), especially when you get it
spewed to the logs continuouslyEssentially, the message gets generated
in cases that have nothing to do with it's original intent, and the cute
log message is useless...

Jason

On Fri, Apr 3, 2015 at 8:03 PM, James Cheng jch...@tivo.com wrote:

 Hi,

 I want to consume from both a specific topic a_topic as well as all
 topics that match a certain prefix prefix.*.

 When I do that using a single instance of a ConsumerConnector, I get a
 hang when creating the 2nd set of message streams.

 Code:
 ConsumerConnector consumer =
 Consumer.createJavaConsumerConnector(consumerConfig);
 MapString, Integer topicCountMap = new HashMapString,
 Integer();
 topicCountMap.put(a_topic, new Integer(1));

 MapString, ListKafkaStreambyte[], byte[] consumerMap =
 consumer.createMessageStreams(topicCountMap);

 // do stuff with resulting streams

 TopicFilter whitelist = new Whitelist(prefix\\.*);
 ListKafkaStreambyte[], byte[] wildcardStreams =
 consumer.createMessageStreamsByFilter(whitelist, 1);

 It hangs when inside createMessageStreamsByFilter(), within
 createEphemeralPathExpectConflictHandleZKBug(): There is an info() message
 saying:
 info(I wrote this conflicted ephemeral node [%s] at %s a while
 back in a different session, .format(data, path)
   + hence I will backoff for this node to be deleted by
 Zookeeper and retry)

 Is this expected to work? Can a ConsumerConnector be used like this, or
 should I have 2 ConsumerConnectors; one for the specific topic, and another
 for the wildcarded topics?

 It works when I use 2 ConsumerConnectors, but I just wanted to check if
 this is expected or not.

 Thanks,
 -James




Re: Which version works for kafka 0.8.2 as consumer?

2015-04-03 Thread Jason Rosenberg
Is there a reason the incomplete version was included in the 0.8.2.1
release?

On Wed, Apr 1, 2015 at 1:02 PM, Mayuresh Gharat gharatmayures...@gmail.com
wrote:

 What do you mean by offset syncing?

 Thanks,

 Mayuresh

 On Wed, Apr 1, 2015 at 9:59 AM, pushkar priyadarshi 
 priyadarshi.push...@gmail.com wrote:

  So in 0.8.2.0/0.8.2.1 high level consumer can not make use of offset
  syncing in kafka?
 
  On Wed, Apr 1, 2015 at 12:51 PM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
 
   Yes, KafkaConsumer in 0.8.2 is still in development. You probably still
   want to use ZookeeperConsumerConnector for now.
  
   On 4/1/15, 9:28 AM, Mark Zang deepnight...@gmail.com wrote:
  
   I found the 0.8.2.0 and 0.8.2.1 has a KafkaConsumer. But this class
  seems
   not completed and not functional. Lots of method returns null or
 throws
   NSM. Which version of consumer for kafka 0.8.2 broker?
   
   Thanks!
   
   --
   Best regards!
   Mike Zang
  
  
 



 --
 -Regards,
 Mayuresh R. Gharat
 (862) 250-7125



Re: New kafka client for Go (golang)

2015-04-03 Thread Jason Rosenberg
How does it compare to Sarama?

On Mon, Mar 30, 2015 at 3:09 PM, Piotr Husiatyński p...@optiopay.com wrote:

 Hi,
 I wanted to share new client library for Go language that I'm developing at
 Optiopay. Library is MIT licensed and provides API close to latest kafka
 request/response wire format with few higher level helpers to make common
 tasks easier.
 The only thing that is not yet implemented is message set compression.

 Code can be found on github: https://github.com/optiopay/kafka



Re: Side-by-side migration for 0.7 to 0.8

2015-04-03 Thread Jason Rosenberg
Hi Patrick,

When we went through this, we 'shaded' the old kafka jar, so the 2 could
co-exist in the same app.  We use maven, and there's a maven 'shade
plugin', etc.

In our case, it was intractable to try to update all producers and
consumers in one go as you suggest, so we had to have a way in our code
base for the old and new to co-exist.

Jason

On Fri, Apr 3, 2015 at 8:48 PM, Patrick McBryde patr...@curalate.com
wrote:

 Hello,
 Does anyone have tips or advice for side-by-side migration from 0.7 to 0.8
 clusters? We're in the process of migrating, and are bringing up our 0.8
 cluster and first producer  consumer now. We're running into issues with
 getting the 0.7 and 0.8 jar's to play nice, as it seems like there's no way
 to do a piecemeal migration since the two jar's cannot coexist in a
 project. So we can't connect to our existing ops infrastructure, can't
 build tests side by side, etc.

 Our current plan is to bring up a small service which enqueues onto our 0.8
 cluster to abstract 0.8 away from our existing infrastructure, and at some
 point migrate all producers and consumers in one go. This feels like a very
 risky method for a migration of production resources, though, so we're
 hoping somebody has a better method for getting the two API versions to
 work together.

 Thanks,
 Patrick



Re: Problem with node after restart no partitions?

2015-04-03 Thread Jason Rosenberg
I'm preparing a longer post here, but we recently ran into a similar
scenario.  Not sure yet if it's the same thing you saw (but it feels
similar).  We were also doing a rolling upgrade from 0.8.1.1 to 0.8.2.1,
and during the controlled shutdown of the first node (of a 4 node cluster),
the controlled shutdown was taking longer than normal (it timed out several
times and was retrying controlled shutdown), and unfortunately, our
deployment system decided to kill it hard (so it was in the middle of it's
4th controlled shutdown retry, etc.).

Anyway, when the node came back, it naturally decided to 'restore' most of
it's partitions, which took some time (but only like 5 minutes).  What's
weird is it didn't decide to resync data from other replicas, instead it
just restored partitions locally.  During this time, the rest of the
cluster failed to elect any new leaders, and so for 5 minutes, those
partitions were unavailable (and we saw a flood of failed FetcherManager
exceptions from the other nodes in the cluster).  Most of the partitions
were empty (e.g. there's no way the other replicas were behind and not in
the ISR normally).  During this 5 minutes, producers were unable to send
messages due to NotLeaderForPartition exceptions.  Apparently the
controller was still sending them to the unavailable broker.

Finally, when the first node finally came up, the other nodes were somewhat
happy again (but a few partitions remained under-replicated indefinitely).
Because of this, we decided to pause the rolling restart, and try to wait
for the under-replicated partitions to get insync.  Unfortunately, about an
hour later, the whole cluster went foobar (e.g. partitions became
unavailable, brokers logged a flood of Fetcher errors, producers couldn't
find a valid leader, metadata requests timed out, etc.).  In a panic, we
reverted that first node back to 0.8.1.1. This did not help, unfortunately,
so, deciding we'd already probably lost data at this point (and producers
could not send data due to (NotLeaderForPartition exceptions)), we decided
to just forcibly do the upgrade to 0.8.2.1.  This was all a bad situation,
of course.

So, now we have the cluster stable at 0.8.2.1, but like you, we are very,
very nervous about doing any kind of restart to any of our nodes.  We lost
data, primarily in the form of producers failing to send during the periods
of unavailability.

It looks like the root cause, in our case, was a flood of topics created
(long-since unused and empty).  This appears to have caused the longer than
normal controlled shutdown, which in turn, led to the followon problems.
However, in the past, we've seen a controlled shutdown failure result in an
unclean shutdown, but usually the cluster recovers (e.g. it elects new
leaders, and when the new node comes back, it recovers it's partitions that
were uncleanly shutdown).  That did not happen this time (the rest of the
cluster got in an apparent infinite loop where it tried repeatedly (e.g.
500K times a minute) to fetch partitions that were unavailable).

I'm preparing a longer post with more detail (will take a bit of time).

Jason

On Thu, Apr 2, 2015 at 10:19 PM, Gwen Shapira gshap...@cloudera.com wrote:

 wow, thats scary for sure.

 Just to be clear - all you did is restart *one* broker in the cluster?
 everything else was ok before the restart? and that was controlled
 shutdown?

 Gwen

 On Wed, Apr 1, 2015 at 11:54 AM, Thunder Stumpges tstump...@ntent.com
 wrote:

  Well it appears we lost all the data on the one node again. It appears to
  be all or part of KAFKA-1647
  https://issues.apache.org/jira/browse/KAFKA-1647 as we saw this in our
  logs (for all topics):
 
  [2015-04-01 10:46:58,901] WARN Partition [logactivity-redirect,3] on
  broker 6: No checkpointed highwatermark is found for partition
  [logactivity-redirect,3] (kafka.cluster.Partition)
  [2015-04-01 10:46:58,902] WARN Partition [pageimpression,1] on broker 6:
  No checkpointed highwatermark is found for partition [pageimpression,1]
  (kafka.cluster.Partition)
  [2015-04-01 10:46:58,904] WARN Partition [campaignplatformtarget,6] on
  broker 6: No checkpointed highwatermark is found for partition
  [campaignplatformtarget,6] (kafka.cluster.Partition)
  [2015-04-01 10:46:58,905] WARN Partition [trackingtags-c2,1] on broker 6:
  No checkpointed highwatermark is found for partition [trackingtags-c2,1]
  (kafka.cluster.Partition)
 
  Followed by:
 
  [2015-04-01 10:46:58,911] INFO Truncating log trafficshaperlog-3 to
 offset
  0. (kafka.log.Log)
  [2015-04-01 10:46:58,928] INFO Truncating log videorecrequest-0 to offset
  0. (kafka.log.Log)
  [2015-04-01 10:46:58,928] INFO Truncating log filteredredirect-2 to
 offset
  0. (kafka.log.Log)
  [2015-04-01 10:46:58,985] INFO Truncating log precheckrequest-3 to offset
  0. (kafka.log.Log)
  [2015-04-01 10:46:58,990] INFO Truncating log filteredclicklog-8 to
 offset
  0. (kafka.log.Log)
 
  Followed by:
  [2015-04-01 10:46:59,107] INFO Scheduling log 

Re: Anyone interested in speaking at Bay Area Kafka meetup @ LinkedIn on March 24?

2015-03-23 Thread Jason Rosenberg
Hi Jon,

It the link for the 1/27 meetup you posted works for me, but I haven't
found how to find that same link on the meetup site (there are links that
point to the live stream, which of course is no longer happening!).

Thoughts?

Thanks,

Jason

On Mon, Mar 2, 2015 at 11:31 AM, Jon Bringhurst 
jbringhu...@linkedin.com.invalid wrote:

 The meetups are recorded. For example, here's a link to the January meetup:

 http://www.ustream.tv/recorded/58109076

 The links to the recordings are usually posted to the comments for each
 meetup on http://www.meetup.com/http-kafka-apache-org/

 -Jon

 On Feb 23, 2015, at 3:24 PM, Ruslan Khafizov ruslan.khafi...@gmail.com
 wrote:

 +1 For recording sessions.
 On 24 Feb 2015 07:22, Jiangjie Qin j...@linkedin.com.invalid wrote:

 +1, I¹m very interested.

 On 2/23/15, 3:05 PM, Jay Kreps jay.kr...@gmail.com wrote:

 +1

 I think something like Kafka on AWS at Netflix would be hugely
 interesting to a lot of people.

 -Jay

 On Mon, Feb 23, 2015 at 3:02 PM, Allen Wang aw...@netflix.com.invalid
 wrote:

 We (Steven Wu and Allen Wang) can talk about Kafka use cases and
 operations
 in Netflix. Specifically, we can talk about how we scale and operate
 Kafka
 clusters in AWS and how we migrate our data pipeline to Kafka.

 Thanks,
 Allen


 On Mon, Feb 23, 2015 at 12:15 PM, Ed Yakabosky 
 eyakabo...@linkedin.com.invalid wrote:

 Hi Kafka Open Source -

 LinkedIn will host another Bay Area Kafka meetup in Mountain View on

 March

 24.  We are planning to present on Offset Management but are looking

 for

 additional speakers.  If you¹re interested in presenting a use case,
 operational plan, or your experience with a particular feature (REST
 interface, WebConsole), please reply-all to let us know.

 [BCC: Open Source lists]

 Thanks,
 Ed





how to remove consumer group.id with storage=kafka

2015-02-26 Thread Jason Rosenberg
All,

There exists code in the sample console consumer that ships with kafka,
that will remove consumer group id's from zookeeper, for the case where
it's just a short-lived session using an auto-generated groupid.  It's a
bit of a hack, but it works (keeps the number of groupids from
proliferating).

I'm wondering if there is a clean way to do this while using the new kafka
offset storage.  I didn't see an example of this in the latest
console-consumer sample.

Thanks,

Jason


Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Jason Rosenberg
On Thu, Feb 5, 2015 at 9:52 PM, Joel Koshy jjkosh...@gmail.com wrote:

  Ok, so it looks like the default settings are:
  offset.storage = zookeeper
  dual.commit.enabled = true
  The doc for 'dual.commit.enabled' seems to imply (but doesn't clearly
  state) that it will only apply if offset.storage = kafka.  Is that right?
  (I'm guessing not)

 dual.commit.enabled defaults to true only if offset.storage is kafka.
 As you noted, it only applies if offset.storage = kafka is primarily
 intended for migration.


I'm not sure what you mean by 'default' behavior 'only if' offset.storage
is kafka.  Does that mean the 'default' behavior is 'false' if
offset.storage is 'zookeeper'?  Can that be clarified in the config
documentation section?

In section 5.6 where the offset managements is described, there is this:
A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be
performed using the above steps if you set offsets.storage=zookeeper.

This implies that dual commit will work also if offsets.storage=zookeeper,
no?  Just not by default?  Perhaps there needs to be clarification there
(and in the config section for offsets.storage  dual.commit.enabled).

The doc in section 5.6 is probably in need of editing, it looks like it in
places assumes zookeeper offset storage, and has some repeated sentences,
etc.

Finally, why is section 5.6 titled Distribution?  Seems to be a grab-bag
of mostly consumer related topics?


  It seems to me less than ideal to have the default behavior to have
  dual.commit.enabled = true, since this seems like a performance hit, no?

 To some degree yes, but it is relatively cheap.


And as you pointed out (I think) it's not even an issue because it won't be
dual committing initially, by default (while offsets.storage = zookeeper),
right?


  Also, I assume the __consumer_offsets topic will be set to have an
 infinite
  retention policy internally, is that right?  So that currently committed
  offsets for a given consumer group won't be lost?

 It uses the compaction retention policy - so the topic won't grow
 unbounded.


Nice



 Thanks,

 Joel

  On Thu, Feb 5, 2015 at 2:21 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   This is documented in the official docs:
   http://kafka.apache.org/documentation.html#distributionimpl
  
   On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
What are the defaults for those settings (I assume it will be to
 continue
using only zookeeper by default)?
   
Also, if I have a cluster of consumers sharing the same groupId, and
 I
update them via a rolling release, will it be a problem during the
   rolling
restart if there is inconsistency in the settings for a short time?
 Or
   is
it required that the entire cluster be stopped, then update configs,
 then
restart all nodes?
   
Jason
   
On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com
 
   wrote:
   
 Thanks Jon. I updated the FAQ with your procedure:



  
 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
 ?

 On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
 jbringhu...@linkedin.com.invalid wrote:

  There should probably be a wiki page started for this so we have
 the
  details in one place. The same question was asked on Freenode
 IRC a
   few
  minutes ago. :)
 
  A summary of the migration procedure is:
 
  1) Upgrade your brokers and set dual.commit.enabled=false and
  offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
  2) Set dual.commit.enabled=true and offsets.storage=kafka and
 restart
  (Commit offsets to Zookeeper and Kafka).
  3) Set dual.commit.enabled=false and offsets.storage=kafka and
   restart
  (Commit offsets to Kafka only).
 
  -Jon
 
  On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com
   wrote:
 
   Hi,
  
   For 0.8.2, one of the features listed is:
- Kafka-based offset storage.
  
   Is there documentation on this (I've heard discussion of it of
   course)?
  
   Also, is it something that will be used by existing consumers
 when
   they
   migrate up to 0.8.2?  What is the migration process?
  
   Thanks,
  
   Jason
 
 

  
  

 --
 Joel



Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Jason Rosenberg
Ok, so it looks like the default settings are:

offset.storage = zookeeper
dual.commit.enabled = true

The doc for 'dual.commit.enabled' seems to imply (but doesn't clearly
state) that it will only apply if offset.storage = kafka.  Is that right?
(I'm guessing not)

*If you are using kafka* as offsets.storage, you can dual commit offsets
to ZooKeeper (in addition to Kafka).

It seems to me less than ideal to have the default behavior to have
dual.commit.enabled = true, since this seems like a performance hit, no?
I'd think you'd only want this during a planned migration.

Also, I assume it's desirable to switch to using 'kafka' for offset
storage, for performance reasons?  Will it better handle a larger number of
topics?
Also, I assume the __consumer_offsets topic will be set to have an infinite
retention policy internally, is that right?  So that currently committed
offsets for a given consumer group won't be lost?

Jason

On Thu, Feb 5, 2015 at 2:21 PM, Joel Koshy jjkosh...@gmail.com wrote:

 This is documented in the official docs:
 http://kafka.apache.org/documentation.html#distributionimpl

 On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
  What are the defaults for those settings (I assume it will be to continue
  using only zookeeper by default)?
 
  Also, if I have a cluster of consumers sharing the same groupId, and I
  update them via a rolling release, will it be a problem during the
 rolling
  restart if there is inconsistency in the settings for a short time?  Or
 is
  it required that the entire cluster be stopped, then update configs, then
  restart all nodes?
 
  Jason
 
  On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
   Thanks Jon. I updated the FAQ with your procedure:
  
  
  
 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
   ?
  
   On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
   jbringhu...@linkedin.com.invalid wrote:
  
There should probably be a wiki page started for this so we have the
details in one place. The same question was asked on Freenode IRC a
 few
minutes ago. :)
   
A summary of the migration procedure is:
   
1) Upgrade your brokers and set dual.commit.enabled=false and
offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
(Commit offsets to Zookeeper and Kafka).
3) Set dual.commit.enabled=false and offsets.storage=kafka and
 restart
(Commit offsets to Kafka only).
   
-Jon
   
On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com
 wrote:
   
 Hi,

 For 0.8.2, one of the features listed is:
  - Kafka-based offset storage.

 Is there documentation on this (I've heard discussion of it of
 course)?

 Also, is it something that will be used by existing consumers when
 they
 migrate up to 0.8.2?  What is the migration process?

 Thanks,

 Jason
   
   
  




question about new consumer offset management in 0.8.2

2015-02-05 Thread Jason Rosenberg
Hi,

For 0.8.2, one of the features listed is:
  - Kafka-based offset storage.

Is there documentation on this (I've heard discussion of it of course)?

Also, is it something that will be used by existing consumers when they
migrate up to 0.8.2?  What is the migration process?

Thanks,

Jason


Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Jason Rosenberg
What are the defaults for those settings (I assume it will be to continue
using only zookeeper by default)?

Also, if I have a cluster of consumers sharing the same groupId, and I
update them via a rolling release, will it be a problem during the rolling
restart if there is inconsistency in the settings for a short time?  Or is
it required that the entire cluster be stopped, then update configs, then
restart all nodes?

Jason

On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks Jon. I updated the FAQ with your procedure:


 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
 ?

 On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst 
 jbringhu...@linkedin.com.invalid wrote:

  There should probably be a wiki page started for this so we have the
  details in one place. The same question was asked on Freenode IRC a few
  minutes ago. :)
 
  A summary of the migration procedure is:
 
  1) Upgrade your brokers and set dual.commit.enabled=false and
  offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
  2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
  (Commit offsets to Zookeeper and Kafka).
  3) Set dual.commit.enabled=false and offsets.storage=kafka and restart
  (Commit offsets to Kafka only).
 
  -Jon
 
  On Feb 5, 2015, at 9:03 AM, Jason Rosenberg j...@squareup.com wrote:
 
   Hi,
  
   For 0.8.2, one of the features listed is:
- Kafka-based offset storage.
  
   Is there documentation on this (I've heard discussion of it of course)?
  
   Also, is it something that will be used by existing consumers when they
   migrate up to 0.8.2?  What is the migration process?
  
   Thanks,
  
   Jason
 
 



Re: Poll: Producer/Consumer impl/language you use?

2015-01-28 Thread Jason Rosenberg
I think the results could be a bit skewed, in cases where an organization
uses multiple languages, but not equally.  In our case, we overwhelmingly
use java clients (90%).  But we also have ruby and Go clients too.  But in
the poll, these come out as equally used client languages.

Jason

On Wed, Jan 28, 2015 at 12:05 PM, David McNelis 
dmcne...@emergingthreats.net wrote:

 I agree with Stephen, it would be really unfortunate to see the Scala api
 go away.

 On Wed, Jan 28, 2015 at 11:57 AM, Stephen Boesch java...@gmail.com
 wrote:

  The scala API going away would be a minus. As Koert mentioned we could
 use
  the java api but it is less ..  well .. functional.
 
  Kafka is included in the Spark examples and external modules and is
 popular
  as a component of ecosystems on Spark (for which scala is the primary
  language).
 
  2015-01-28 8:51 GMT-08:00 Otis Gospodnetic otis.gospodne...@gmail.com:
 
   Hi,
  
   I don't have a good excuse here. :(
   I thought about including Scala, but for some reason didn't do it.  I
 see
   12-13% of people chose Other.  Do you think that is because I didn't
   include Scala?
  
   Also, is the Scala API reeally going away?
  
   Otis
   --
   Monitoring * Alerting * Anomaly Detection * Centralized Log Management
   Solr  Elasticsearch Support * http://sematext.com/
  
  
   On Tue, Jan 20, 2015 at 4:43 PM, Koert Kuipers ko...@tresata.com
  wrote:
  
no scala? although scala can indeed use the java api, its ugly we
prefer to use the scala api (which i believe will go away
  unfortunately)
   
On Tue, Jan 20, 2015 at 2:52 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:
   
 Hi,

 I was wondering which implementations/languages people use for
 their
Kafka
 Producer/Consumers not everyone is using the Java APIs.  So
  here's
   a
 1-question poll:


  
 http://blog.sematext.com/2015/01/20/kafka-poll-producer-consumer-client/

 Will share the results in about a week when we have enough votes.

 Thanks!
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log
  Management
 Solr  Elasticsearch Support * http://sematext.com/

   
  
 



Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jason Rosenberg
Ok,

It looks like the yammer MetricName is not being created correctly for the
sub metrics that include a topic. E.g. a metric with an mbeanName like:

kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic

appears to be malformed. A yammer MetricName has 4 fields that are used in
creating a graphite metric, that are included in the constructor:
group, type, name, scope.

In this case, the metric with the above mbeanName has these fields set in
the MetricName:

group: kafka.server
type: BrokerTopicMetrics
name: BytesInPerSec
scope: null

Thus, the topic metrics all look the same, and get lumped into the
top-level BrokerTopicMetrics (and thus that will now be double counted). It
looks like the fix for kafka-1481 was where things got broken. It seems to
have introduced ‘tags’ in the building of metric names, and then those tags
only get applied to the mbeanName, but get excluded from the metric name:
https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f

This is a pretty severe issue, since the yammer metrics for these stats
will be double counted in aggregate, and the per-topic stats will be
removed.

I should note too, in my previous email, I thought that only the per-topic
BrokerTopicMetrics were missing, but also several other per-topic metrics
are missing too, e.g. under kafka.log, etc.

Jason
​

On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg j...@squareup.com wrote:

 I can confirm that the per topic metrics are not coming through to the
 yammer metrics registry.  I do see them in jmx (via jconsole), but the
 MetricsRegistry does not have them.
 All the other metrics are coming through that appear in jmx.

 This is with single node instance running locally.

 Jason



 On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:

 If you are using multi-node cluster, then metrics may be reported from
 other servers.
 pl check all the servers in the cluster.

 On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker kyleban...@gmail.com
 wrote:

  I've been using a custom KafkaMetricsReporter to report Kafka broker
  metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and
 messages in
  and out for all topics together and for each individual topic.
 
  After upgrading to v0.8.2.0, these metrics are no longer being reported.
 
  I'm only seeing the following:
  BrokerTopicMetrics
  - BytesInPerSec
  - BytesOutPerSec
  - BytesRejectedPerSec
  - MessagesInPerSec
 
  What's more, despite lots of successful writes to the cluster, the
 values
  for these remaining metrics are all zero.
 
  I saw that there was some refactoring of metric naming code. Was the
  behavior supposed to have changed?
 
  Many thanks in advance.
 





Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jason Rosenberg
Remember multiple people have reported this issue. Per topic metrics no
longer appear in graphite (or in any system modeled after the yammer
GraphiteReporter). They are not being seen as unique.

While these metrics are registered in the registry as separate ‘MetricName’
instances (varying only by mbeanName), the GraphiteReporter sends the
metrics to graphite using only the 4 fields I describe above. Consequently,
multiple metrics in the registry get sent to graphite under the same name.
Thus these metrics all end up in the same bucket in graphite, trampling
over each other making them useless. They aren’t ‘double counted’ so much
as flapping between multiple independent values.

We actually have our own Reporter class (based off the yammer
GraphiteReporter). Our version sends metrics through kafka which is then
consumed downstream by multiple metric consumers.

The ConsoleReporter isn’t useful for actually persisting metrics anywhere.
It’s just iterating through all the (identically named metrics in the
registry (save for the different mbeanNames))….

The mbeanName, as constructed, is not useful as a human readable metric
name, to be presented in a browsable tree of metrics, etc. The
‘group’:’type’:’name’:’scope’ are the pieces that matter.

The fix here is to produce MetricName instances similar to 0.8.1.1, etc. In
this case, it should probably be something like:

group: kafka.server
type: BrokerTopicMetrics
name: mytopic-BytesInPerSec
group: unused

Jason
​

On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 I have enabled yammer's ConsoleReporter and I am getting all the metrics
 (including per-topic metrics).

 Yammer's MetricName object implements equals/hashcode methods using
 mBeanName . We are constructing a unique mBeanName for each metric, So we
 are not missing/overwriting any metrics.

 Current confusion is due to  MetricName.name(). This will be same
 (BytesInPerSec) for both broker level and topic level metrics. We need to
 use MetricName.getMBeanName() to differentiate between broker level and
 topic level metrics.

 0.8.1  MBeanName:
 kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec
 kafka.server:type=BrokerTopicMetrics,name=MYTOPIC-BytesInPerSec

 0.8.2  MBeanName:
 kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
 kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC


 ConsoleReporter's O/P:

   BytesInPerSec:  - This is broker level
  count = 1521
  mean rate = 3.63 bytes/s
  1-minute rate = 0.35 bytes/s
  5-minute rate = 2.07 bytes/s
 15-minute rate = 1.25 bytes/s

   BytesInPerSec:  - This is for topic1
  count = 626
  mean rate = 1.89 bytes/s
  1-minute rate = 0.42 bytes/s
  5-minute rate = 31.53 bytes/s
 15-minute rate = 64.66 bytes/s

   BytesInPerSec:  - This is for topic2
  count = 895
  mean rate = 3.62 bytes/s
  1-minute rate = 1.39 bytes/s
  5-minute rate = 30.08 bytes/s
 15-minute rate = 50.27 bytes/s

 Manikumar

 On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg j...@squareup.com wrote:

  Ok,
 
  It looks like the yammer MetricName is not being created correctly for
 the
  sub metrics that include a topic. E.g. a metric with an mbeanName like:
 
 
 kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic
 
  appears to be malformed. A yammer MetricName has 4 fields that are used
 in
  creating a graphite metric, that are included in the constructor:
  group, type, name, scope.
 
  In this case, the metric with the above mbeanName has these fields set in
  the MetricName:
 
  group: kafka.server
  type: BrokerTopicMetrics
  name: BytesInPerSec
  scope: null
 
  Thus, the topic metrics all look the same, and get lumped into the
  top-level BrokerTopicMetrics (and thus that will now be double counted).
 It
  looks like the fix for kafka-1481 was where things got broken. It seems
 to
  have introduced ‘tags’ in the building of metric names, and then those
 tags
  only get applied to the mbeanName, but get excluded from the metric name:
 
 
 https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f
 
  This is a pretty severe issue, since the yammer metrics for these stats
  will be double counted in aggregate, and the per-topic stats will be
  removed.
 
  I should note too, in my previous email, I thought that only the
 per-topic
  BrokerTopicMetrics were missing, but also several other per-topic metrics
  are missing too, e.g. under kafka.log, etc.
 
  Jason
  ​
 
  On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg j...@squareup.com
 wrote:
 
   I can confirm that the per topic metrics are not coming through to the
   yammer metrics registry.  I do see them in jmx (via jconsole), but the
   MetricsRegistry does not have them.
   All the other metrics are coming through that appear in jmx.
  
   This is with single node instance running locally.
  
   Jason
  
  
  
   On Mon

Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Jason Rosenberg
I added a comment to the ticket.  I think it will work getting data
disambiguated (didn't actually test end to end to graphite).
However, the naming scheme is not ideal for how metric ui's typically would
present the metric tree (e.g. jmx tag syntax doesn't really translate).

Jason

On Tue, Jan 27, 2015 at 11:19 AM, Jun Rao j...@confluent.io wrote:

 Jason, Kyle,

 I created an 0.8.2 blocker
 https://issues.apache.org/jira/browse/KAFKA-1902
 and attached a patch there. Could you test it out and see if it fixes the
 issue with the reporter? The patch adds tags as scope in MetricName.

 Thanks,

 Jun

 On Tue, Jan 27, 2015 at 7:39 AM, Jun Rao j...@confluent.io wrote:

  Jason,
 
  So, this sounds like a real issue. Perhaps we can fix it just by setting
  the tag name as the scope. For example, for mbean kafka.server:type=
  BrokerTopicMetrics,name=BytesInPerSec,topic=test, we can have
 
  group: kafka.server
  type: BrokerTopicMetrics
  name: BytesInPerSec
  scope: topic=test
 
  Do you know if scope can have characters like = and , (e.g., for
 scope
  like topic=test,partition=1)?
 
  The issue with using mytopic-BytesInPerSec as the name is what we are
  trying to fix in kafka-1481. Topic name (and clientId, etc) can have dash
  in it and it's hard to parse.
 
  Thanks,
 
  Jun
 
 
 
  On Tue, Jan 27, 2015 at 6:30 AM, Jason Rosenberg j...@squareup.com
 wrote:
 
  Remember multiple people have reported this issue. Per topic metrics no
  longer appear in graphite (or in any system modeled after the yammer
  GraphiteReporter). They are not being seen as unique.
 
  While these metrics are registered in the registry as separate
  ‘MetricName’
  instances (varying only by mbeanName), the GraphiteReporter sends the
  metrics to graphite using only the 4 fields I describe above.
  Consequently,
  multiple metrics in the registry get sent to graphite under the same
 name.
  Thus these metrics all end up in the same bucket in graphite, trampling
  over each other making them useless. They aren’t ‘double counted’ so
 much
  as flapping between multiple independent values.
 
  We actually have our own Reporter class (based off the yammer
  GraphiteReporter). Our version sends metrics through kafka which is then
  consumed downstream by multiple metric consumers.
 
  The ConsoleReporter isn’t useful for actually persisting metrics
 anywhere.
  It’s just iterating through all the (identically named metrics in the
  registry (save for the different mbeanNames))….
 
  The mbeanName, as constructed, is not useful as a human readable metric
  name, to be presented in a browsable tree of metrics, etc. The
  ‘group’:’type’:’name’:’scope’ are the pieces that matter.
 
  The fix here is to produce MetricName instances similar to 0.8.1.1, etc.
  In
  this case, it should probably be something like:
 
  group: kafka.server
  type: BrokerTopicMetrics
  name: mytopic-BytesInPerSec
  group: unused
 
  Jason
  ​
 
  On Tue, Jan 27, 2015 at 7:26 AM, Manikumar Reddy ku...@nmsworks.co.in
  wrote:
 
   I have enabled yammer's ConsoleReporter and I am getting all the
 metrics
   (including per-topic metrics).
  
   Yammer's MetricName object implements equals/hashcode methods using
   mBeanName . We are constructing a unique mBeanName for each metric, So
  we
   are not missing/overwriting any metrics.
  
   Current confusion is due to  MetricName.name(). This will be same
   (BytesInPerSec) for both broker level and topic level metrics. We need
  to
   use MetricName.getMBeanName() to differentiate between broker level
 and
   topic level metrics.
  
   0.8.1  MBeanName:
   kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec
   kafka.server:type=BrokerTopicMetrics,name=MYTOPIC-BytesInPerSec
  
   0.8.2  MBeanName:
   kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
   kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC
  
  
   ConsoleReporter's O/P:
  
 BytesInPerSec:  - This is broker level
count = 1521
mean rate = 3.63 bytes/s
1-minute rate = 0.35 bytes/s
5-minute rate = 2.07 bytes/s
   15-minute rate = 1.25 bytes/s
  
 BytesInPerSec:  - This is for topic1
count = 626
mean rate = 1.89 bytes/s
1-minute rate = 0.42 bytes/s
5-minute rate = 31.53 bytes/s
   15-minute rate = 64.66 bytes/s
  
 BytesInPerSec:  - This is for topic2
count = 895
mean rate = 3.62 bytes/s
1-minute rate = 1.39 bytes/s
5-minute rate = 30.08 bytes/s
   15-minute rate = 50.27 bytes/s
  
   Manikumar
  
   On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg j...@squareup.com
  wrote:
  
Ok,
   
It looks like the yammer MetricName is not being created correctly
 for
   the
sub metrics that include a topic. E.g. a metric with an mbeanName
  like:
   
   
  
 
 kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic
   
appears to be malformed

Re: Regarding Kafka release 0.8.2-beta

2015-01-26 Thread Jason Rosenberg
shouldn't the new consumer api be removed from the 0.8.2 code base then?

On Fri, Jan 23, 2015 at 10:30 AM, Joe Stein joe.st...@stealth.ly wrote:

 The new consumer is scheduled for 0.9.0.

 Currently Kafka release candidate 2 for 0.8.2.0 is being voted on.

 There is an in progress patch to the new consumer that you can try out
 https://issues.apache.org/jira/browse/KAFKA-1760

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Fri, Jan 23, 2015 at 1:55 AM, Reeni Mathew reenimathew...@gmail.com
 wrote:

  Hi Team,
 
  I was playing around with your recent release 0.8.2-beta.
  Producer worked fine whereas new consumer did not.
 
  org.apache.kafka.clients.consumer.KafkaConsumer
 
  After digging the code I realized that the implementation for the same is
  not available. Only API is present.
  Could you please let me know by when we can expect the implementation of
  the same.
 
  Thanks  Regards
 
  Reeni
 



Re: unable to delete topic with 0.8.2 rc2

2015-01-26 Thread Jason Rosenberg
So, this is rather disappointing, especially since topic deletion is really
the primary feature in 0.8.2 I'm interested in.  The topic I was trying to
delete above had no data for many months.  The consumer which is triggering
recreation of that topic has been restarted several times since that topic
stopped receiving data.

In our case, we have a large number of topics, that no longer receive
messages (many have not received messages in over 12 months).  However, we
have consumers that use a regex which matches multiple topics, some of
which are these empty zombie topics.  Thus, we have a chicken-and-egg
problem:

1.  Consumer uses a regex to discover matching topics.
2.  Consumer starts consuming 'topic.foo.which.is.empty'.  This results in
regular meta-data requests for data for that topic.
3.  'topic.foo.which.is.empty' is deleted.
4.  Consumer encounters an error trying to fetch this topic, so issues a
meta-data request to find the leader for this topic.
5.  Broker recreates the topic in response to this.

We can stop consumers for maintenance and do a batch delete, but it is
problematic, because there are realtime dependencies on those consumers
being up and running.

I expect this will necessarily result in my voting no for 0.8.2 RC2 (if I
have a vote :)).

Just read up on KAFKA-1507, and left my 2 cents:

I think relegating topic creation to an admin client would be very
limitiing. It's extremely useful to have a self-service system where new
applications can just create a new topic on demand (with reasonable
defaults), without the need for an admin to come in and prepare topics
ahead of a code release (leave that to dba's managing transactional
databases!).

I do like the idea of an automatic create topic request from a producer, in
response to a topic not found exception, rather than auto-creating topics
from meta-data requests (which happens asynchronously and causes the
initial meta data request to fail usually!). Consumers should never create
a topic, I should think.

On Mon, Jan 26, 2015 at 11:14 AM, Harsha ka...@harsha.io wrote:

 Jun,
   I made an attempt at fixing that issue as part of this JIRA
   https://issues.apache.org/jira/browse/KAFKA-1507 .
 As Jay pointed out there should be admin api if there is more info on
 this api I am interested in adding/fixing this issue.
 Thanks,
 Harsha

 On Mon, Jan 26, 2015, at 07:28 AM, Jun Rao wrote:
  Yes, that's the issue. Currently, topics can be auto-created on
  TopicMetadataRequest, which can be issued from both the producer and the
  consumer. To prevent that, you would need to stop the producer and the
  consumer before deleting a topic. We plan to address this issue once we
  have a separate request for creating topics.
 
  Thanks,
 
  Jun
 
  On Mon, Jan 26, 2015 at 7:21 AM, Harsha ka...@harsha.io wrote:
 
   There could be another case where if you have auto.create.topics.enable
   to set to true ( its true by default) . Any TopicMetadataRequest can
   recreate topics. So if you issued a delete topic command and you have
   producers running or consumers? too which is issuing a
   TopicMetadataRequest than the topic will be recreated.
   -Harsha
  
   On Sun, Jan 25, 2015, at 11:26 PM, Jason Rosenberg wrote:
cversion did change (incremented by 2) when I issue the delete
 command.
   
From the logs on the conroller broker (also the leader for the
 topic), it
looks like the delete proceeds, and then the topic gets recreated
immediately (highlighted in yellow). It appears maybe it’s due to a
consumer client app trying to consume the topic. Also, the consumer
 is
not
yet updated to 0.8.2 (it’s using 0.8.1.1), perhaps that’s part of the
problem?
   
   
2015-01-26 07:02:14,281  INFO
[ZkClient-EventThread-21-myzkserver:12345/mynamespace]
controller.PartitionStateMachine$DeleteTopicsListener -
[DeleteTopicsListener on 6]: Starting topic deletion for topics
mytopic
2015-01-26 07:02:14,282  INFO [delete-topics-thread-6]
controller.TopicDeletionManager$DeleteTopicsThread -
[delete-topics-thread-6], Handling deletion for topics mytopic
2015-01-26 07:02:14,286  INFO [delete-topics-thread-6]
controller.TopicDeletionManager$DeleteTopicsThread -
[delete-topics-thread-6], Deletion of topic mytopic (re)started
2015-01-26 07:02:14,286  INFO [delete-topics-thread-6]
controller.TopicDeletionManager - [Topic Deletion Manager 6], Topic
deletion callback for mytopic
2015-01-26 07:02:14,289  INFO [delete-topics-thread-6]
controller.TopicDeletionManager - [Topic Deletion Manager 6],
Partition deletion callback for [mytopic,0]
2015-01-26 07:02:14,295  INFO [delete-topics-thread-6]
controller.ReplicaStateMachine - [Replica state machine on controller
6]: Invoking state change to OfflineReplica for replicas
   
  
 [Topic=mytopic,Partition=0,Replica=7],[Topic=mytopic,Partition=0,Replica=6]
2015-01-26 07:02:14,303  INFO [delete-topics-thread-6

Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-26 Thread Jason Rosenberg
I can confirm that the per topic metrics are not coming through to the
yammer metrics registry.  I do see them in jmx (via jconsole), but the
MetricsRegistry does not have them.
All the other metrics are coming through that appear in jmx.

This is with single node instance running locally.

Jason



On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 If you are using multi-node cluster, then metrics may be reported from
 other servers.
 pl check all the servers in the cluster.

 On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker kyleban...@gmail.com wrote:

  I've been using a custom KafkaMetricsReporter to report Kafka broker
  metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and messages
 in
  and out for all topics together and for each individual topic.
 
  After upgrading to v0.8.2.0, these metrics are no longer being reported.
 
  I'm only seeing the following:
  BrokerTopicMetrics
  - BytesInPerSec
  - BytesOutPerSec
  - BytesRejectedPerSec
  - MessagesInPerSec
 
  What's more, despite lots of successful writes to the cluster, the values
  for these remaining metrics are all zero.
 
  I saw that there was some refactoring of metric naming code. Was the
  behavior supposed to have changed?
 
  Many thanks in advance.
 



Re: unable to delete topic with 0.8.2 rc2

2015-01-25 Thread Jason Rosenberg
yes

On Mon, Jan 26, 2015 at 12:18 AM, Jun Rao j...@confluent.io wrote:

 Do you have delete.topic.enable turned on in all brokers?

 Thanks,

 Jun

 On Sun, Jan 25, 2015 at 7:56 PM, Jason Rosenberg j...@squareup.com wrote:

  So far, I have been unable to get delete topic to work, with release
  candidate 2 for 0.8.2.
 
  It worked ok when I ran it in the debugger locally, on a single node
  instance. But when I run it in our staging environment, it is not
  successfully even marking the topic for delete, for some reason.
 
  I am setting delete.topic.enable to true (and see the confirmation of
 this
  in the startup logs, e.g.):
 
  INFO [main] utils.VerifiableProperties - Property delete.topic.enable
  is overridden to true
 
  I run this command:
 
  java -cp app.jar kafka.admin.TopicCommand --zookeeper
  myzkconnect:12345/mynamespace --delete --topic mytopic
 
  log4j:WARN No appenders could be found for logger
  (org.I0Itec.zkclient.ZkConnection).
  log4j:WARN Please initialize the log4j system properly.
  log4j:WARN No appenders could be found for logger
  (org.I0Itec.zkclient.ZkEventThread).
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
  for more info.
  log4j:WARN Please initialize the log4j system properly.
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
  for more info.
  Topic mytopic is marked for deletion.
  Note: This will have no impact if delete.topic.enable is not set to true.
 
  I then do a —list which should at least show the topic marked for
 deletion:
 
  java -cp app.jar kafka.admin.TopicCommand --zookeeper
  myzkconnect:12345/mynamespace --list --topic mytopic
 
  log4j:WARN No appenders could be found for logger
  (org.I0Itec.zkclient.ZkConnection).
  log4j:WARN No appenders could be found for logger
  (org.I0Itec.zkclient.ZkEventThread).
  log4j:WARN Please initialize the log4j system properly.
  log4j:WARN Please initialize the log4j system properly.
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
  for more info.
  log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig
  for more info.
  mytopic
 
  Note, it doesn’t list it as ‘marked for deletion’. Furthermore, waiting
  multiple hours still doesn’t result in the topic being deleted.
 
  The topic has 1 partition, 2 replicas, and no data stored.
 
  In zookeeper, the /admin/deleted_topics/ path is empty.
 
  The zookeeper code looks pretty straightforward, but for some reason is
 not
  writing the deleted_topics path.  We are running zookeeper 3.4.6.
 
  Thoughts?
 
  Jason
  ​
 



Re: warning on startup of consumer app with 0.8.2 rc2

2015-01-25 Thread Jason Rosenberg
Or you could have your build generate a static constant class from the
build version.

On Mon, Jan 26, 2015 at 12:06 AM, Jun Rao j...@confluent.io wrote:

 The issue with embedding the version in the code is that you have to update
 the version in two places, the build script and the code.

 Thanks,

 Jun

 On Sun, Jan 25, 2015 at 7:05 PM, Jason Rosenberg j...@squareup.com wrote:

  I don't think it's really unusual for deployment environments to produce
  single shaded jars for an app.  Thus, I'm wondering if we can't rethink
  this here?  E.g. just have a constant in code which states the version?
  Rather than emit the confusing warning, especially for client apps?
 
  Jason
 
  On Fri, Jan 23, 2015 at 5:24 PM, Jun Rao j...@confluent.io wrote:
 
   The only impact is that you don't get the mbean that tells you the
  version
   of the jar. That's why it's just a warning.
  
   Thanks,
  
   Jun
  
   On Fri, Jan 23, 2015 at 1:04 PM, Jason Rosenberg j...@squareup.com
  wrote:
  
What are the ramifications if it can't find the version?  It looks
 like
   it
uses it set a yammer Gauge metric.  Anything more than that?
   
Jason
   
On Fri, Jan 23, 2015 at 3:24 PM, Jun Rao j...@confluent.io wrote:
   
 Yes, that's probably the issue. If you repackage the Kafka jar, you
   need
to
 include the following in the repacked jar that was included in the
original
 Kafka jar. Our code looks for version info from there.

 META-INF/

 META-INF/MANIFEST.MF

 Thanks,

 Jun

 On Fri, Jan 23, 2015 at 11:59 AM, Jason Rosenberg 
 j...@squareup.com
 wrote:

  In this case, we have a single shaded jar for our app for
  deployment
(so
  just 1 jar on the classpath).  Could that be the issue? E.g. all
 dependent
  jars are unpacked into a single jar within our deployment
  system
 
  On Thu, Jan 22, 2015 at 6:11 PM, Jun Rao j...@confluent.io
 wrote:
 
   Hmm, kafka-console-consumer in 0.8.2 rc2 is running fine. Do
 you
   have
   multiple kafka jars in your classpath?
  
   Thanks,
  
   Jun
  
   On Thu, Jan 22, 2015 at 4:58 PM, Jason Rosenberg 
  j...@squareup.com
   
  wrote:
  
2015-01-23 00:55:25,273  WARN [async-message-sender-0]
 common.AppInfo$
  -
Can't read Kafka version from MANIFEST.MF. Possible cause:
java.lang.NullPointerException
   
  
 

   
  
 



Re: warning on startup of consumer app with 0.8.2 rc2

2015-01-25 Thread Jason Rosenberg
I don't think it's really unusual for deployment environments to produce
single shaded jars for an app.  Thus, I'm wondering if we can't rethink
this here?  E.g. just have a constant in code which states the version?
Rather than emit the confusing warning, especially for client apps?

Jason

On Fri, Jan 23, 2015 at 5:24 PM, Jun Rao j...@confluent.io wrote:

 The only impact is that you don't get the mbean that tells you the version
 of the jar. That's why it's just a warning.

 Thanks,

 Jun

 On Fri, Jan 23, 2015 at 1:04 PM, Jason Rosenberg j...@squareup.com wrote:

  What are the ramifications if it can't find the version?  It looks like
 it
  uses it set a yammer Gauge metric.  Anything more than that?
 
  Jason
 
  On Fri, Jan 23, 2015 at 3:24 PM, Jun Rao j...@confluent.io wrote:
 
   Yes, that's probably the issue. If you repackage the Kafka jar, you
 need
  to
   include the following in the repacked jar that was included in the
  original
   Kafka jar. Our code looks for version info from there.
  
   META-INF/
  
   META-INF/MANIFEST.MF
  
   Thanks,
  
   Jun
  
   On Fri, Jan 23, 2015 at 11:59 AM, Jason Rosenberg j...@squareup.com
   wrote:
  
In this case, we have a single shaded jar for our app for deployment
  (so
just 1 jar on the classpath).  Could that be the issue? E.g. all
   dependent
jars are unpacked into a single jar within our deployment system
   
On Thu, Jan 22, 2015 at 6:11 PM, Jun Rao j...@confluent.io wrote:
   
 Hmm, kafka-console-consumer in 0.8.2 rc2 is running fine. Do you
 have
 multiple kafka jars in your classpath?

 Thanks,

 Jun

 On Thu, Jan 22, 2015 at 4:58 PM, Jason Rosenberg j...@squareup.com
 
wrote:

  2015-01-23 00:55:25,273  WARN [async-message-sender-0]
   common.AppInfo$
-
  Can't read Kafka version from MANIFEST.MF. Possible cause:
  java.lang.NullPointerException
 

   
  
 



Re: warning on startup of consumer app with 0.8.2 rc2

2015-01-23 Thread Jason Rosenberg
In this case, we have a single shaded jar for our app for deployment (so
just 1 jar on the classpath).  Could that be the issue? E.g. all dependent
jars are unpacked into a single jar within our deployment system

On Thu, Jan 22, 2015 at 6:11 PM, Jun Rao j...@confluent.io wrote:

 Hmm, kafka-console-consumer in 0.8.2 rc2 is running fine. Do you have
 multiple kafka jars in your classpath?

 Thanks,

 Jun

 On Thu, Jan 22, 2015 at 4:58 PM, Jason Rosenberg j...@squareup.com wrote:

  2015-01-23 00:55:25,273  WARN [async-message-sender-0] common.AppInfo$ -
  Can't read Kafka version from MANIFEST.MF. Possible cause:
  java.lang.NullPointerException
 



Re: warning on startup of consumer app with 0.8.2 rc2

2015-01-23 Thread Jason Rosenberg
What are the ramifications if it can't find the version?  It looks like it
uses it set a yammer Gauge metric.  Anything more than that?

Jason

On Fri, Jan 23, 2015 at 3:24 PM, Jun Rao j...@confluent.io wrote:

 Yes, that's probably the issue. If you repackage the Kafka jar, you need to
 include the following in the repacked jar that was included in the original
 Kafka jar. Our code looks for version info from there.

 META-INF/

 META-INF/MANIFEST.MF

 Thanks,

 Jun

 On Fri, Jan 23, 2015 at 11:59 AM, Jason Rosenberg j...@squareup.com
 wrote:

  In this case, we have a single shaded jar for our app for deployment (so
  just 1 jar on the classpath).  Could that be the issue? E.g. all
 dependent
  jars are unpacked into a single jar within our deployment system
 
  On Thu, Jan 22, 2015 at 6:11 PM, Jun Rao j...@confluent.io wrote:
 
   Hmm, kafka-console-consumer in 0.8.2 rc2 is running fine. Do you have
   multiple kafka jars in your classpath?
  
   Thanks,
  
   Jun
  
   On Thu, Jan 22, 2015 at 4:58 PM, Jason Rosenberg j...@squareup.com
  wrote:
  
2015-01-23 00:55:25,273  WARN [async-message-sender-0]
 common.AppInfo$
  -
Can't read Kafka version from MANIFEST.MF. Possible cause:
java.lang.NullPointerException
   
  
 



warning on startup of consumer app with 0.8.2 rc2

2015-01-22 Thread Jason Rosenberg
2015-01-23 00:55:25,273  WARN [async-message-sender-0] common.AppInfo$ -
Can't read Kafka version from MANIFEST.MF. Possible cause:
java.lang.NullPointerException


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-02 Thread Jason Rosenberg
In our case, we use protocol buffers for all messages, and these have
simple serialization/deserialization builtin to the protobuf libraries
(e.g. MyProtobufMessage.toByteArray()).  Also, we often produce/consume
messages without conversion to/from protobuf Objects (e.g. in cases where
we are just forwarding messages on to other topics, or if we are consuming
directly to a binary blob store like hdfs).  There's a huge efficiency in
not over synthesizing new Objects.

Thus, it's nice to only deal with bytes directly in all messages, and keep
things simple.  Having the overhead of having to dummy in a default,
generically parameterized, no-op serializer (and the overhead of having
that extra no-op method call, seems unnecessary).

I'd suggest that maybe it could work seamlessly either way (which it
probably does now, for the case where no serializer is provided, but not
sure if it efficiently will elide the call to the no-op serializer after
JIT?)Alternatively, I do think it's important to preserve the
efficiency of sending raw bytes directly, so if necessary, maybe expose
both apis (one which explicitly bypasses any serialization).

Finally, I've wondered in the past about enabling some sort of streaming
serialization, whereby you hook up a producer to a long living stream
class, which could integrate compression in line, and allow more control of
the pipeline.  The stream would implement an iterator to get the next
serialized message, etc.  For me, something like this might be a reason to
have a serialization/deserialization abstraction built into the
producer/consumer api's.

But if I have a vote, I'd be in favor of keeping the api simple and have it
take bytes directly.

Jason

On Tue, Dec 2, 2014 at 9:50 PM, Jan Filipiak jan.filip...@trivago.com
wrote:

 Hello Everyone,

 I would very much appreciate if someone could provide me a real world
 examplewhere it is more convenient to implement the serializers instead of
 just making sure to provide bytearrays.

 The code we came up with explicitly avoids the serializer api. I think it
 is common understanding that if you want to transport data you need to have
 it as a bytearray.

 If at all I personally would like to have a serializer interface that
 takes the same types as the producer

 public interface SerializerK,V extends Configurable {
 public byte[] serializeKey(K data);
 public byte[] serializeValue(V data);
 public void close();
 }

 this would avoid long serialize implementations with branches like
 switch(topic) or if(isKey). Further serializer per topic makes more
 sense in my opinion. It feels natural to have a one to one relationship
 from types to topics or at least only a few partition per type. But as we
 inherit the type from the producer we would have to create many producers.
 This would create additional unnecessary connections to the brokers. With
 the serializers we create a one type to all topics relationship and the
 only type that satisfies that is the bytearray or Object. Am I missing
 something here? As said in the beginning I would like to that usecase that
 really benefits from using the serializers. I think in theory they sound
 great but they cause real practical issues that may lead users to wrong
 decisions.

 -1 for putting the serializers back in.

 Looking forward to replies that can show me the benefit of serializes and
 especially how the
 Type = topic relationship can be handled nicely.

 Best
 Jan




 On 25.11.2014 02:58, Jun Rao wrote:

 Hi, Everyone,

 I'd like to start a discussion on whether it makes sense to add the
 serializer api back to the new java producer. Currently, the new java
 producer takes a byte array for both the key and the value. While this api
 is simple, it pushes the serialization logic into the application. This
 makes it hard to reason about what type of data is being sent to Kafka and
 also makes it hard to share an implementation of the serializer. For
 example, to support Avro, the serialization logic could be quite involved
 since it might need to register the Avro schema in some remote registry
 and
 maintain a schema cache locally, etc. Without a serialization api, it's
 impossible to share such an implementation so that people can easily
 reuse.
 We sort of overlooked this implication during the initial discussion of
 the
 producer api.

 So, I'd like to propose an api change to the new producer by adding back
 the serializer api similar to what we had in the old producer. Specially,
 the proposed api changes are the following.

 First, we change KafkaProducer to take generic types K and V for the key
 and the value, respectively.

 public class KafkaProducerK,V implements ProducerK,V {

  public FutureRecordMetadata send(ProducerRecordK,V record,
 Callback
 callback);

  public FutureRecordMetadata send(ProducerRecordK,V record);
 }

 Second, we add two new configs, one for the key serializer and another for
 the value serializer. Both serializers will 

Re: Re: How to push metrics to graphite - jmxtrans does not work

2014-12-02 Thread Jason Rosenberg
fwiw, we wrap the kafka server in our java service container framework.
This allows us to use the default GraphiteReporter class that is part of
the yammer metrics library (which is used by kafka directly).  So it works
seemlessly.  (We've since changed our use of GraphiteReporter to instead
send all our metrics via kafka :))

Jason

On Tue, Dec 2, 2014 at 11:00 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi,

 You can make use of this documentation aimed at JMX and monitoring:
 https://sematext.atlassian.net/wiki/display/PUBSPM/SPM+Monitor+-+Standalone

 There is a section about Kafka and the information is not SPM-specific.

 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/


 On Tue, Dec 2, 2014 at 9:34 PM, YuanJia Li yuanjia8...@163.com wrote:

  Hi David,
  Just edit kafka-server-start.sh, and add export JMX_PORT=,it
  will work.
 
 
 
 
  Yuanjia
 
  From: David Montgomery
  Date: 2014-12-03 04:47
  To:users
  Subject: Re: How to push metrics to graphite - jmxtrans does not work
  Hi,
 
  I am seeing this in the logs and wondering what jmx_port:-1 means?
 
  INFO conflict in /brokers/ids/29136 data: { host:104.111.111.111.,
  jmx_port:-1, port:9092, timestamp:1417552817875, version:1 }
  stored data: { host:104.111.111, jmx_port:-1, port:9092,
  timestamp:1417552738253, version:1
 
  despite having these added
 
  echo 'KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote.port=
  -Dcom.sun.management.jmxremote=true
  -Dcom.sun.management.jmxremote.authenticate=false
  -Dcom.sun.management.jmxremote.ssl=false' | tee -a
  /var/kafka/bin/kafka-run-class.sh
  echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
  /var/kafka/bin/kafka-server-start.sh
 
  Thanks
 
  On Tue, Dec 2, 2014 at 9:58 PM, Andrew Otto ao...@wikimedia.org wrote:
 
   Maybe also set:
  
-Dcom.sun.management.jmxremote.port=
  
   ?
  
  
On Dec 2, 2014, at 02:59, David Montgomery 
 davidmontgom...@gmail.com
   wrote:
   
Hi,
   
I am having a very difficult time trying to report kafka 8 metrics to
Graphite.  Nothing is listening on  and and no data in graphite.
  If
this method of graphite reporting is know to not work is there an
alternative to jmxtrans to get data to graphite?
   
I am using the deb file to install jmxtrans on ubuntu 12.04
   
And I use the below to modify kafka scripts
   
echo 'KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false' | tee -a
/var/kafka/bin/kafka-run-class.sh
echo 'export JMX_PORT=${JMX_PORT:-}' | tee -a
/var/kafka/bin/kafka-server-start.sh
   
{
 servers : [ {
   host : 127.0.0.1,
   port : ,
   alias : %=node.name%,
   queries : [
{
obj : kafka:type=kafka.SocketServerStats,
 resultAlias: kafka.socketServerStats,
 attr : [ AvgFetchRequestMs, AvgProduceRequestMs,
BytesReadPerSecond, BytesWrittenPerSecond,
  FetchRequestsPerSecond,
MaxFetchRequestMs, MaxProduceRequestMs , NumFetchRequests ,
NumProduceRequests , ProduceRequestsPerSecond, TotalBytesRead,
TotalBytesWritten, TotalFetchRequestMs, TotalProduceRequestMs
 ],
outputWriters : [ {
 @class :
   com.googlecode.jmxtrans.model.output.GraphiteWriter,
 settings : {
   host : %=@monitor_host%,
   port : 2003
 }
   } ]
 }
   ],
   numQueryThreads: 2
 } ]
}
  
  
 



Re: Kafka restart takes a long time

2014-11-23 Thread Jason Rosenberg
Rajiv,

So, any time a broker's disk fills up, it will shut itself down immediately
(it will do this in response to any IO error on writing to disk).
Unfortunately, this means that the node will not be able to do any
housecleaning before shutdown, which is an 'unclean' shutdown.  This means
that when it restarts, it needs to reset the data to the last known
checkpoint.  If the partition is replicated, and it can restore it from
another broker, it will try to do that (but it doesn't sound like it can do
that in your case, since all the other nodes are down too).

There is a fix coming in 0.8.2 that will allow a broker to restore multiple
partitions in parallel (but the current behavior in 0.8.1.1 and prior is to
restore partitions 1 by 1). See:
https://issues.apache.org/jira/browse/KAFKA-1414.  This fix should speed
things up greatly when you have a large number of partitions.

If a disk is full, the broker will refuse to even start up (or will fail
immediately on the first write attempt and shut itself down).  So,
generally, in this event, you need to clear some disk space before trying
to restart the server.

The bottom line is that you don't want any of your brokers to run out of
disk space (thus you need to have good monitoring/alerting for advance
warning on this).  Kafka doesn't attempt to detect if it's about to run out
of space and die, so you have to manage that and guard against it outside
of kafka.

Jason

On Sat, Nov 22, 2014 at 5:27 PM, Harsha ka...@harsha.io wrote:

 It might logs check your kafka logs dir (server logs) . Kafka can
 produce lot of logs in a quick time make sure thats whats in play here.
 -Harsha

 On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote:
  Actually see a bunch of errors. One of the brokers is out of space and
  this
  might be causing everything to spin out of control.
 
  Some logs:
 
  On *broker 1* (the one that has run out of space):
 
  2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13  ]
  [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-1-13], Disk
  error while replicating data.
 
  kafka.common.KafkaStorageException: I/O exception in append to log
  'mytopic-633'
 
  at kafka.log.Log.append(Log.scala:283)
 
  at
 
 kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)
 
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 
  at
  scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
 
  at
  scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
 
  at
  scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
 
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 
  at
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 
  at kafka.utils.Utils$.inLock(Utils.scala:538)
 
  at
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 
  at
  kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 
  at
  kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
 
  Caused by: java.io.IOException: No space left on device
 
  at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
 
  at
  sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
 
  at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
 
  at sun.nio.ch.IOUtil.write(IOUtil.java:65)
 
  at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
 
  at
 
 kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132)
 
  at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
 
  at kafka.log.LogSegment.append(LogSegment.scala:80)
 
  at kafka.log.Log.append(Log.scala:269)
 
  ... 13 more
 
  2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13  ]
  [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-2-13],
  Error
  getting offset for partition [myTopic,0] to broker 13
 
  java.io.IOException: No space left on device
 
  at java.io.FileOutputStream.writeBytes(Native Method)
 
  at java.io.FileOutputStream.write(FileOutputStream.java:345)
 
  at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
 
  at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
 
  at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
 
  at 

new producer api and batched Futures....

2014-11-20 Thread Jason Rosenberg
I've been looking at the new producer api with anticipation, but have not
fired it up yet.

One question I have, is it looks like there's no longer a 'batch' send mode
(and I get that this is all now handled internally, e.g. you send
individual messages, that then get collated and batched up and sent out).

What I'm wondering, is whether there's added overhead in the producer (and
the client code) having to manage all the Future return Objects from all
the individual messages sent?  If I'm sending 100K messages/second, etc.,
that seems like a lot of async Future Objects that have to be tickled, and
waited for, etc.  Does not this cause some overhead?

If I send a bunch of messages and then store all the Future's in a list,
and then wait for all of them, it seems like a lot of thread contention.
On the other hand, if I send a batch of messages, that are likely all to
get sent as a single batch over the wire (cuz they are all going to the
same partition), wouldn't there be some benefit in only having to wait for
a single Future Object for the batch?

Jason


Re: new producer api and batched Futures....

2014-11-20 Thread Jason Rosenberg
I guess it would make the api less clean, but I can imagine a sendBatch
method, which returns a single Future that gets triggered only when all
messages in the batch were finished.  The callback info could then contain
info about the success/exceptions encountered by each sub-group of
messages.  And the callback could even be called multiple times, once for
each sub-batch sent.   It gets complicated to think about it, but it would
be fewer Future objects created and less async contention/waiting, etc.

I'll try it out and see

Jason

On Thu, Nov 20, 2014 at 7:56 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Internally it works as you describe, there is only one CountDownLatch per
 batch sent, each of the futures is just a wrapper around that.

 It is true that if you accumulate thousands of futures in a list that may
 be a fair number of objects you are retaining, and there will be some work
 involved in checking them all. If you are sure they are all going to the
 same partition you can actually wait on the last future since sends are
 ordered within a partition. So when the final send completes the prior
 sends should also have completed.

 Either way if you see a case where the new producer isn't as fast as the
 old producer let us know.

 -Jay



 On Thu, Nov 20, 2014 at 4:24 PM, Jason Rosenberg j...@squareup.com wrote:

  I've been looking at the new producer api with anticipation, but have not
  fired it up yet.
 
  One question I have, is it looks like there's no longer a 'batch' send
 mode
  (and I get that this is all now handled internally, e.g. you send
  individual messages, that then get collated and batched up and sent out).
 
  What I'm wondering, is whether there's added overhead in the producer
 (and
  the client code) having to manage all the Future return Objects from all
  the individual messages sent?  If I'm sending 100K messages/second, etc.,
  that seems like a lot of async Future Objects that have to be tickled,
 and
  waited for, etc.  Does not this cause some overhead?
 
  If I send a bunch of messages and then store all the Future's in a list,
  and then wait for all of them, it seems like a lot of thread contention.
  On the other hand, if I send a batch of messages, that are likely all to
  get sent as a single batch over the wire (cuz they are all going to the
  same partition), wouldn't there be some benefit in only having to wait
 for
  a single Future Object for the batch?
 
  Jason
 



Re: ISR shrink to 0?

2014-11-19 Thread Jason Rosenberg
What if it never comes back with unclean leader election disabled (but
another broker does come back)?

On Wed, Nov 19, 2014 at 9:32 PM, Jun Rao jun...@gmail.com wrote:

 In that case, we just wait until the broker in ISR is back and make it the
 leader and take whatever data is has.

 Thanks,

 Jun

 On Tue, Nov 18, 2014 at 10:36 PM, Jason Rosenberg j...@squareup.com
 wrote:

  Ok,
 
  Makes sense.  But if the node is not actually healthy (and underwent a
 hard
  crash) it would likely not be able to avoid an 'unclean' restart.what
  happens if unclean leader election is disabled, but there are no 'clean'
  partitions available?
 
  Jason
 
  On Wed, Nov 19, 2014 at 12:40 AM, Jun Rao jun...@gmail.com wrote:
 
   Yes, we will preserve the last replica in ISR. This way, we know which
   replica has all committed messages and can wait for it to come back as
  the
   leader, if unclean leader election is disabled.
  
   Thanks,
  
   Jun
  
   On Mon, Nov 17, 2014 at 11:06 AM, Jason Rosenberg j...@squareup.com
   wrote:
  
We have had 2 nodes in a 4 node cluster die this weekend, sadly.
Fortunately there was no critical data on these machines yet.
   
The cluster is running 0.8.1.1, and using replication factor of 2
 for 2
topics, each with 20 partitions.
   
For sake of discussion, assume that nodes A and B are still up, and C
   and D
are now down.
   
As expected, partitions that had one replica on a good host (A or B)
  and
one on a bad node (C or D), had their ISR shrink to just 1 node (A or
  B).
   
Roughly 1/6 of the partitions had their 2 replicas on the 2 bad
 nodes,
  C
and D.  For these, I was expecting the ISR to show up as empty, and
 the
partition unavailable.
   
However, that's not what I'm seeing.  When running TopicCommand
   --describe,
I see that the ISR still shows 1 replica, on node D (D was the second
   node
to go down).
   
And, producers are still periodically trying to produce to node D
 (but
failing and retrying to one of the good nodes).
   
So, it seems the cluster's meta data is still thinking that node D is
  up
and serving the partitions that were only replicated on C and D.
However,
for partitions that were on A and D, or B and D, D is not shown as
  being
   in
the ISR.
   
Is this correct?  Should the cluster continue showing the last node
 to
   have
been alive for a partition as still in the ISR?
   
Jason
   
  
 



Re: ISR shrink to 0?

2014-11-18 Thread Jason Rosenberg
Not sure what happened, but the issue went away once revived the broker id
on a new host

But it does seem host D's ISR leadership could not be cleared until another
member of the ISR came back.somehow D was stale and remained stuck (and
clients therefore kept trying to connect to it)...

Jason

On Mon, Nov 17, 2014 at 2:06 PM, Jason Rosenberg j...@squareup.com wrote:

 We have had 2 nodes in a 4 node cluster die this weekend, sadly.
 Fortunately there was no critical data on these machines yet.

 The cluster is running 0.8.1.1, and using replication factor of 2 for 2
 topics, each with 20 partitions.

 For sake of discussion, assume that nodes A and B are still up, and C and
 D are now down.

 As expected, partitions that had one replica on a good host (A or B) and
 one on a bad node (C or D), had their ISR shrink to just 1 node (A or B).

 Roughly 1/6 of the partitions had their 2 replicas on the 2 bad nodes, C
 and D.  For these, I was expecting the ISR to show up as empty, and the
 partition unavailable.

 However, that's not what I'm seeing.  When running TopicCommand
 --describe, I see that the ISR still shows 1 replica, on node D (D was the
 second node to go down).

 And, producers are still periodically trying to produce to node D (but
 failing and retrying to one of the good nodes).

 So, it seems the cluster's meta data is still thinking that node D is up
 and serving the partitions that were only replicated on C and D.   However,
 for partitions that were on A and D, or B and D, D is not shown as being in
 the ISR.

 Is this correct?  Should the cluster continue showing the last node to
 have been alive for a partition as still in the ISR?

 Jason





Re: selecting java producer (0.8.2 or 0.8.1.1?)

2014-11-18 Thread Jason Rosenberg
Hi Jun,

Is this the official java doc for the new producer (www.trieuvan.com)?  I'm
not seeing any links to it (or any documentation) on the apache kafka site
(am I overlooking it)?

Should there be a link to it in the 0.8.2-beta documentation page?

Jason

On Tue, Nov 18, 2014 at 7:23 PM, Jun Rao jun...@gmail.com wrote:

 The new producer in 0.8.2 is considered stable, although it's relatively
 new. Compared with the old producer, it has the following features.

 1. Use non-blocking socket to send requests to the broker. So uses fewer
 threads and have better throughput.
 2. Bound the memory consumption.
 3. Support a callback when sending a request asynchronously.
 4. Returns the offset for each produced message.

 You can look at the example in the java doc.

 http://www.trieuvan.com/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/producer/KafkaProducer.html

 Thanks,

 Jun

 On Mon, Nov 17, 2014 at 4:46 AM, Shlomi Hazan shl...@viber.com wrote:

  Hi,
  I need to make a choice and I can't get a full picture on the differences
  between the two.
  E.g.:
  Are both producers async capable to the same extent?
  Is the new producer stable for production?
  Is there some usage example for the new producer?
  What are the tradeoffs using one or another?
  10x,
  Shlomi
 



Re: selecting java producer (0.8.2 or 0.8.1.1?)

2014-11-18 Thread Jason Rosenberg
So that java-doc link includes a new KafkaConsumer (but it seems in other
threads that's not being referred to as ready for use until 0.9, is that
right?).Is there a way to know which parts of that javadoc are
considered beta-ready in 0.8.2 and which are not?

Jason

On Tue, Nov 18, 2014 at 11:03 PM, Joe Stein joe.st...@stealth.ly wrote:

 0.8.2-beta java doc
 https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/

 /***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
 /
 On Nov 18, 2014 10:33 PM, Jason Rosenberg j...@squareup.com wrote:

  Hi Jun,
 
  Is this the official java doc for the new producer (www.trieuvan.com)?
  I'm
  not seeing any links to it (or any documentation) on the apache kafka
 site
  (am I overlooking it)?
 
  Should there be a link to it in the 0.8.2-beta documentation page?
 
  Jason
 
  On Tue, Nov 18, 2014 at 7:23 PM, Jun Rao jun...@gmail.com wrote:
 
   The new producer in 0.8.2 is considered stable, although it's
 relatively
   new. Compared with the old producer, it has the following features.
  
   1. Use non-blocking socket to send requests to the broker. So uses
 fewer
   threads and have better throughput.
   2. Bound the memory consumption.
   3. Support a callback when sending a request asynchronously.
   4. Returns the offset for each produced message.
  
   You can look at the example in the java doc.
  
  
 
 http://www.trieuvan.com/apache/kafka/0.8.2-beta/java-doc/org/apache/kafka/clients/producer/KafkaProducer.html
  
   Thanks,
  
   Jun
  
   On Mon, Nov 17, 2014 at 4:46 AM, Shlomi Hazan shl...@viber.com
 wrote:
  
Hi,
I need to make a choice and I can't get a full picture on the
  differences
between the two.
E.g.:
Are both producers async capable to the same extent?
Is the new producer stable for production?
Is there some usage example for the new producer?
What are the tradeoffs using one or another?
10x,
Shlomi
   
  
 



Re: ISR shrink to 0?

2014-11-18 Thread Jason Rosenberg
Ok,

Makes sense.  But if the node is not actually healthy (and underwent a hard
crash) it would likely not be able to avoid an 'unclean' restart.what
happens if unclean leader election is disabled, but there are no 'clean'
partitions available?

Jason

On Wed, Nov 19, 2014 at 12:40 AM, Jun Rao jun...@gmail.com wrote:

 Yes, we will preserve the last replica in ISR. This way, we know which
 replica has all committed messages and can wait for it to come back as the
 leader, if unclean leader election is disabled.

 Thanks,

 Jun

 On Mon, Nov 17, 2014 at 11:06 AM, Jason Rosenberg j...@squareup.com
 wrote:

  We have had 2 nodes in a 4 node cluster die this weekend, sadly.
  Fortunately there was no critical data on these machines yet.
 
  The cluster is running 0.8.1.1, and using replication factor of 2 for 2
  topics, each with 20 partitions.
 
  For sake of discussion, assume that nodes A and B are still up, and C
 and D
  are now down.
 
  As expected, partitions that had one replica on a good host (A or B) and
  one on a bad node (C or D), had their ISR shrink to just 1 node (A or B).
 
  Roughly 1/6 of the partitions had their 2 replicas on the 2 bad nodes, C
  and D.  For these, I was expecting the ISR to show up as empty, and the
  partition unavailable.
 
  However, that's not what I'm seeing.  When running TopicCommand
 --describe,
  I see that the ISR still shows 1 replica, on node D (D was the second
 node
  to go down).
 
  And, producers are still periodically trying to produce to node D (but
  failing and retrying to one of the good nodes).
 
  So, it seems the cluster's meta data is still thinking that node D is up
  and serving the partitions that were only replicated on C and D.
  However,
  for partitions that were on A and D, or B and D, D is not shown as being
 in
  the ISR.
 
  Is this correct?  Should the cluster continue showing the last node to
 have
  been alive for a partition as still in the ISR?
 
  Jason
 



Re: HL publishing, retries and potential race condition

2014-11-16 Thread Jason Rosenberg
This has apparently been fixed in 0.8.2:
https://issues.apache.org/jira/browse/KAFKA-899

On Mon, Oct 6, 2014 at 3:02 PM, Jun Rao jun...@gmail.com wrote:

 Yes, transient error like LeaderNotAvailableException can happen. If you
 configure enough retries, then you shouldn't see the exception in the
 normal case.

 Thanks,

 Jun

 On Mon, Oct 6, 2014 at 5:35 AM, Stevo Slavić ssla...@gmail.com wrote:

  Hello Apache Kafka community,
 
  When trying to publish (using high level sync producer) a message on a
  non-existing topic (with implicit topic creation enabled), with
  message.send.max.retries set to 1, sending will fail with
  FailedToSendMessageException (and LeaderNotAvailableException swallowed).
 
  Am I doing something wrong, are my expectations wrong that this should
 work
  without exception thrown, or is this a known Kafka (with ZooKeeper as
  storage) issue?
 
  I'm using Kafka 0.8.1.1.
 
  Kind regards,
  Stevo Slavic.
 



Re: OffsetOutOfRange errors

2014-11-07 Thread Jason Rosenberg
The bottom line, is you are likely not consuming messages fast enough, so
you are falling behind.  So, you are steadily consuming older and older
messages, and eventually you are consuming messages older than the
retention time window set for your kafka broker.  That's the typical
scenario for this error.

Also, if you are consuming messages 300/sec, but you are committing once
every 100 messages, that means you are committing 3 times a second, which
could be a bottleneck. The default auto-offset commit is to do it every 60
seconds, etc.

Jason

On Fri, Nov 7, 2014 at 1:30 PM, Jimmy John jj...@livefyre.com wrote:

 The current setting is to commit to ZK every 100 messages read.

 The read buffer size is 262144 bytes. So we will read in a bunch of
 messages in a batch. And while iterating through those messages, we commit
 the offset to ZK every 100.

 jim

 On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang wangg...@gmail.com wrote:

  Hi Jim,
 
  When messages gets cleaned based on data retention policy (by time or by
  size), the brokers will not inform ZK for the deletion event. The
  underlying assumption is that when consumers are fetching data at around
  the tail of the log (i.e. they are not much lagging, which is normal
 cases)
  they should be continuously update the consumed offsets in ZK and hence
  that offsets will be valid most of the time. When consumers are lagging
  behind and the old messages are cleaned they will get this exception, and
  consumers need to handle it by resetting their offset to, e.g. the head
 of
  the log.
 
  How frequent do your clients read / write the offsets in ZK?
 
  Guozhang
 
  On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John jj...@livefyre.com wrote:
 
   Hello,
  
 I understand what this error means, just not sure why I keep running
  into
   it after 24-48 hrs of running fine consuming  300 messages / second.
  
 What happens when a kafka log rolls over and some old records are
 aged
   out? I mean what happens to the offsets? We are using a python client
  which
   stores the offsets in ZK. But in the middle of the run, say after 2
 days
  or
   so, suddenly it gets this error.
  
   The only possibility is that the older records have aged off and ZK
 still
   has the offset which is no longer applicable...How does the java client
   deal with this? Does kafka inform ZK that records have been aged off
 and
   update the offset or something?
  
   Here is the error i see in the broker logs
  
   [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing
 fetch
   request for partition [activity.stream,3] offset 8013827 from consumer
   with
   correlation id 73 (kafka.server.KafkaApis)
  
kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but
  we
   only have log segments in the range 8603331 to 11279773.
  
at kafka.log.Log.read(Log.scala:380)
  
at
  
  
 
 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
  
at
  
  
 
 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
  
at
  
  
 
 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
  
at
  
  
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
  
at
  
  
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
  
at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
  
   at
   scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
  
at scala.collection.immutable.Map$Map3.map(Map.scala:144)
  
at
  
  
 
 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
  
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
  
at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
  
at
  kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
  
at java.lang.Thread.run(Thread.java:745)
  
  
   thx
  
   Jim
  
 
 
 
  --
  -- Guozhang
 



corrupt recovery checkpoint file issue....

2014-11-06 Thread Jason Rosenberg
Hi,

We recently had a kafka node go down suddenly. When it came back up, it
apparently had a corrupt recovery file, and refused to startup:

2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
starting up KafkaServer
java.lang.NumberFormatException: For input string:
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:481)
at java.lang.Integer.parseInt(Integer.java:527)
at 
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at kafka.log.LogManager.loadLogs(LogManager.scala:105)
at kafka.log.LogManager.init(LogManager.scala:57)
at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
at kafka.server.KafkaServer.startup(KafkaServer.scala:72)

And since the app is under a monitor (so it was repeatedly restarting and
failing with this error for several minutes before we got to it)…

We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it
then restarted cleanly (but of course re-synced all it’s data from
replicas, so we had no data loss).

Anyway, I’m wondering if that’s the expected behavior? Or should it not
declare it corrupt and then proceed automatically to an unclean restart?

Should this NumberFormatException be handled a bit more gracefully?

We saved the corrupt file if it’s worth inspecting (although I doubt it
will be useful!)….

Jason
​


Re: corrupt recovery checkpoint file issue....

2014-11-06 Thread Jason Rosenberg
forgot to mention, we are using 0.8.1.1

Jason

On Thu, Nov 6, 2014 at 9:31 AM, Jason Rosenberg j...@squareup.com wrote:

 Hi,

 We recently had a kafka node go down suddenly. When it came back up, it
 apparently had a corrupt recovery file, and refused to startup:

 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error starting up 
 KafkaServer
 java.lang.NumberFormatException: For input string: 
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
 at 
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 at java.lang.Integer.parseInt(Integer.java:481)
 at java.lang.Integer.parseInt(Integer.java:527)
 at 
 scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
 at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
 at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at kafka.log.LogManager.loadLogs(LogManager.scala:105)
 at kafka.log.LogManager.init(LogManager.scala:57)
 at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:72)

 And since the app is under a monitor (so it was repeatedly restarting and
 failing with this error for several minutes before we got to it)…

 We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and
 it then restarted cleanly (but of course re-synced all it’s data from
 replicas, so we had no data loss).

 Anyway, I’m wondering if that’s the expected behavior? Or should it not
 declare it corrupt and then proceed automatically to an unclean restart?

 Should this NumberFormatException be handled a bit more gracefully?

 We saved the corrupt file if it’s worth inspecting (although I doubt it
 will be useful!)….

 Jason
 ​



Re: zookeeper upgrade or remove zookeeper dependency

2014-11-06 Thread Jason Rosenberg
We have been using zk 3.4.6 (and we use curator), without any problems with
kafka, for quite a while now

Jason

On Thu, Sep 18, 2014 at 2:18 PM, Mingtao Zhang mail2ming...@gmail.com
wrote:

 Great :)

 Best Regards,
 Mingtao

 On Thu, Sep 18, 2014 at 2:04 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi Mingtao,
 
  We are shooting to cut the 0.8.2 branch this month.
 
  Guozhang
 
  On Thu, Sep 18, 2014 at 10:36 AM, Mingtao Zhang mail2ming...@gmail.com
  wrote:
 
   Good to know. Does it mean release will go out after those bug is fixed
  or
   moved to newer release? :)
  
   Best Regards,
   Mingtao
  
   On Wed, Sep 17, 2014 at 9:34 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   wrote:
  
You can track the list of open bugs here

   
  
 
 https://issues.apache.org/jira/browse/KAFKA-1558?jql=project%20%3D%20Kafka%20and%20fixVersion%20%3D%200.8.2%20and%20status%20!%3D%20Resolved%20and%20status%20!%3D%20Closed

.
   
   
On Wed, Sep 17, 2014 at 10:00 AM, Mingtao Zhang 
  mail2ming...@gmail.com
wrote:
   
 Could you also share a rough time point of 0.8.2 release?

 Best Regards,
 Mingtao

 On Wed, Sep 17, 2014 at 12:10 PM, Neha Narkhede 
   neha.narkh...@gmail.com

 wrote:

  Kafka trunk is on a later zookeeper version (3.4.6). So the next
release
  (0.8.2) will depend on zookeeper 3.4.6
 
  On Wed, Sep 17, 2014 at 8:55 AM, Mingtao Zhang 
   mail2ming...@gmail.com

  wrote:
 
   Hi,
  
   I could see kafka is using zookeeper 3.3.4. For my integration
 purpose, I
   want to use curator, which requires a higher version than 3.3.4
   even
in
  its
   lowest version.
  
   I there any plan to bump up zookeeper dependency? Or is there
 any
plan
 to
   remove zookeeper dependency?
  
   Best Regards,
   Mingtao
  
 

   
  
 
 
 
  --
  -- Guozhang
 



Re: Disactivating Yammer Metrics Monitoring

2014-11-06 Thread Jason Rosenberg
Hi Francois,

We had the exact same problem.  We embed Kafka in our service container,
and we use yammer metrics to see data about the whole app (e.g. kafka, the
jvm, the service container wrapping it).  However, as you observed, by
default, kafka produces an insane amount of metrics.  So what we did, is
using the yammer library, you can disable specific metrics by removing
metrics from the yammer MetricsRegistry, which you can access from guice
(if you are using guice).  I implemented the MetricsRegistryListener, and
added the ability to remove metric names by regex, so I can still have some
metrics show up (like the simple 'AllTopic' counts for messages/bytes sent
from the producer), but block everything else that's per topic, etc

Jason

On Fri, Sep 19, 2014 at 11:34 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi,

 I don't have any source or configs handy to check things, but you are
 saying you've configured Kafka to use GraphiteReporter, right?  So why not
 remove that config, so metrics stop being sent to Graphite if your Graphite
 setup is suffering?  If you do that and you still want to see your Kafka
 metrics, you can always use SPM http://sematext.com/spm/ for Kafka
 (though some of the graphs will be empty until we get KAFKA-1481, or
 something else that improves metrics, in).  If you just want to use it
 temporarily, just use the free 30-day trial version until you beef up your
 Graphite setup.

 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/


 On Fri, Sep 19, 2014 at 10:08 AM, François Langelier 
 f.langel...@gmail.com
 wrote:

  Hi Daniel,
 
  Thank you for your answer.
 
  It's possible that I didn't understood something, if so correct me
 please.
 
  From what I understood, from the kafka doc #monitoring
  http://kafka.apache.org/documentation.html#monitoring, kafka use
 Yammer
  Metrics for monitoring the servers (the brokers) and the clients
 (producers
  and consumers).
 
  Our web site is also using Yammer Metrics and push that to our Graphite
  server and our web site also produce message in kafka.
  From what I read, the Yammer Metrics GraphiteReporter is a kind of
  Singleton, once I Enable it, it is working for all the process. (But I
  might be wrong here...)
 
  We recently upgrade kafka from 0.7.2 to 0.8.1.1 and since the upgrade,
  kafka is monitoring in our Graphite Server and is hammering it, so we
  aren't able to use it because we always get timeout...
 
  SO, I was wondering if there is a way to disable the kafka monitoring to
  our Graphite server.
 
  We are using the code in the tag 0.8.1.1 on github, so if the
 kafka-ganglia
  isn't in the tag, we aren't using it :)
 
 
  François Langelier
  Étudiant en génie Logiciel - École de Technologie Supérieure
  http://www.etsmtl.ca/
  Capitaine Club Capra http://capra.etsmtl.ca/
  VP-Communication - CS Games http://csgames.org 2014
  Jeux de Génie http://www.jdgets.com/ 2011 à 2014
  Magistrat Fraternité du Piranha http://fraternitedupiranha.com/
  Comité Organisateur Olympiades ÉTS 2012
  Compétition Québécoise d'Ingénierie 2012 - Compétition Senior
 
  On Wed, Sep 17, 2014 at 11:05 PM, Daniel Compton d...@danielcompton.net
 
  wrote:
 
   Hi Francois
  
   I didn't quite understand how you've set up your metrics reporting. Are
  you
   using the https://github.com/criteo/kafka-ganglia metrics reporter? If
  so
   then you should be able to adjust the config to exclude the metrics you
   don't want, with kafka.ganglia.metrics.exclude.regex.
  
  
   On 18 September 2014 07:55, François Langelier f.langel...@gmail.com
   wrote:
  
Hi all!
   
We are using yammer metrics to monitor some parts of our system.
   
Since we upgrade from kafka 0.7.2 to 0.8.1.1, we saw a lot more data
getting in our graphite server and from what I saw, it looks like it
  all
come from our producers.
   
From what i understand, since we already use graphite, our
   graphiteReporter
is enable in our main web site and our kafka producers are having fun
   using
it too to monitor in graphite.
   
The problem is that right now kafka is hammering of graphite server
 and
   we
have difficulty to saw our monitored data...
   
Is there a way to deactivate the monitoring of our kafka producers?
   
   
François Langelier
Étudiant en génie Logiciel - École de Technologie Supérieure
http://www.etsmtl.ca/
Capitaine Club Capra http://capra.etsmtl.ca/
VP-Communication - CS Games http://csgames.org 2014
Jeux de Génie http://www.jdgets.com/ 2011 à 2014
Magistrat Fraternité du Piranha http://fraternitedupiranha.com/
Comité Organisateur Olympiades ÉTS 2012
Compétition Québécoise d'Ingénierie 2012 - Compétition Senior
   
  
 



Re: Interaction of retention settings for broker and topic plus partitions

2014-11-06 Thread Jason Rosenberg
Jun,

To clarify though, is it correct that a per topic limit will always
override the default limit of the same type?  (e.g. a large per-topic
retention hours vs. a small default retention hours)?

Jason

On Sat, Sep 20, 2014 at 12:28 AM, Jun Rao jun...@gmail.com wrote:

 That's right. The rule is that a log segment is deleted if either the size
 or the time limit is reached. Log sizes are per partition.

 Thanks,

 Jun

 On Thu, Sep 18, 2014 at 2:55 PM, Cory Watson gp...@keen.io wrote:

  Hello all!
 
  I'm curious about the interaction of server and topic level retention
  settings. It's not clear to me the precedence of the follow:
 
 - broker's default log.retention.bytes
 - topic's retention.bytes (which defaults to broker's
 log.retention.bytes)
 - broker's log.retention.hours and log.retention.minutes (if both are
 specified then it seems to be the lower of the two, since it's when
 either is exceeded)
 
  It seems that the rule is that when any of these are violated then the
 log
  segment is deleted. Is this right?
 
  Also, just to be clear: The log sizes in questions are for a single
  partitions logs?
 
  I have a situation where my per-topic retention.bytes is very high, but
 my
  default log.retention.hours is lower (the default @ 168 hours). It seems
  that it's truncating at the log.retention.hours instead of the topic's
  retention.bytes.
 
  Am I understanding this correctly? :)
 
  --
  Cory Watson
  Principal Infrastructure Engineer // Keen IO
 



Re: corrupt recovery checkpoint file issue....

2014-11-06 Thread Jason Rosenberg
I'm still not sure what caused the reboot of the system (but yes it appears
to have crashed hard).  The file system is xfs, on CentOs linux.  I'm not
yet sure, but I think also before the crash, the system might have become
wedged.

It appears the corrupt recovery files actually contained all zero bytes,
after looking at it with odb.

I'll file a Jira.

On Thu, Nov 6, 2014 at 7:09 PM, Jun Rao jun...@gmail.com wrote:

 I am also wondering how the corruption happened. The way that we update the
 OffsetCheckpoint file is to first write to a tmp file and flush the data.
 We then rename the tmp file to the final file. This is done to prevent
 corruption caused by a crash in the middle of the writes. In your case, was
 the host crashed? What kind of storage system are you using? Is there any
 non-volatile cache on the storage system?

 Thanks,

 Jun

 On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg j...@squareup.com wrote:

  Hi,
 
  We recently had a kafka node go down suddenly. When it came back up, it
  apparently had a corrupt recovery file, and refused to startup:
 
  2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
  starting up KafkaServer
  java.lang.NumberFormatException: For input string:
 
 
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
 
 
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
  at
 
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Integer.parseInt(Integer.java:481)
  at java.lang.Integer.parseInt(Integer.java:527)
  at
  scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
  at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
  at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
  at
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at kafka.log.LogManager.loadLogs(LogManager.scala:105)
  at kafka.log.LogManager.init(LogManager.scala:57)
  at
 kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
  at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
 
  And since the app is under a monitor (so it was repeatedly restarting and
  failing with this error for several minutes before we got to it)…
 
  We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and
 it
  then restarted cleanly (but of course re-synced all it’s data from
  replicas, so we had no data loss).
 
  Anyway, I’m wondering if that’s the expected behavior? Or should it not
  declare it corrupt and then proceed automatically to an unclean restart?
 
  Should this NumberFormatException be handled a bit more gracefully?
 
  We saved the corrupt file if it’s worth inspecting (although I doubt it
  will be useful!)….
 
  Jason
  ​
 



Re: corrupt recovery checkpoint file issue....

2014-11-06 Thread Jason Rosenberg
filed: https://issues.apache.org/jira/browse/KAFKA-1758

On Thu, Nov 6, 2014 at 11:50 PM, Jason Rosenberg j...@squareup.com wrote:

 I'm still not sure what caused the reboot of the system (but yes it
 appears to have crashed hard).  The file system is xfs, on CentOs linux.
 I'm not yet sure, but I think also before the crash, the system might have
 become wedged.

 It appears the corrupt recovery files actually contained all zero bytes,
 after looking at it with odb.

 I'll file a Jira.

 On Thu, Nov 6, 2014 at 7:09 PM, Jun Rao jun...@gmail.com wrote:

 I am also wondering how the corruption happened. The way that we update
 the
 OffsetCheckpoint file is to first write to a tmp file and flush the data.
 We then rename the tmp file to the final file. This is done to prevent
 corruption caused by a crash in the middle of the writes. In your case,
 was
 the host crashed? What kind of storage system are you using? Is there any
 non-volatile cache on the storage system?

 Thanks,

 Jun

 On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg j...@squareup.com wrote:

  Hi,
 
  We recently had a kafka node go down suddenly. When it came back up, it
  apparently had a corrupt recovery file, and refused to startup:
 
  2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
  starting up KafkaServer
  java.lang.NumberFormatException: For input string:
 
 
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
 
 
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
  at
 
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Integer.parseInt(Integer.java:481)
  at java.lang.Integer.parseInt(Integer.java:527)
  at
  scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
  at
 scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
  at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
  at
  kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
  at
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at
  scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at kafka.log.LogManager.loadLogs(LogManager.scala:105)
  at kafka.log.LogManager.init(LogManager.scala:57)
  at
 kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
  at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
 
  And since the app is under a monitor (so it was repeatedly restarting
 and
  failing with this error for several minutes before we got to it)…
 
  We moved the ‘recovery-point-offset-checkpoint’ file out of the way,
 and it
  then restarted cleanly (but of course re-synced all it’s data from
  replicas, so we had no data loss).
 
  Anyway, I’m wondering if that’s the expected behavior? Or should it not
  declare it corrupt and then proceed automatically to an unclean restart?
 
  Should this NumberFormatException be handled a bit more gracefully?
 
  We saved the corrupt file if it’s worth inspecting (although I doubt it
  will be useful!)….
 
  Jason
  ​
 





Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-11-03 Thread Jason Rosenberg
Are there any config parameter updates/changes?  I see the doc here:
http://kafka.apache.org/documentation.html#configuration
now defaults to 0.8.2-beta.  But it would be useful to know if anything has
changed from 0.8.1.1, just so we can be sure to update things, etc.



On Sat, Nov 1, 2014 at 11:18 AM, Koert Kuipers ko...@tresata.com wrote:

 joe,
 looking at those 0.8.2 beta javadoc I also see a Consumer api and
 KafkaConsumer class. they look different from what I currently use in
 8.1.1. Is this new? And this is not the 0.9 consumer?
 thanks, koert
 On Oct 30, 2014 8:01 AM, Joe Stein joe.st...@stealth.ly wrote:

  Hey, yeah!
 
  For the new producer
  https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
 
  The java consumer is slated in 0.9 more on that here
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
  On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard p...@spootnik.org
 
  wrote:
 
   Hi Joe et al.
  
   Congrats on the beta release!
   Do I read correctly that libraries can now rely on
   org.apache.kafka/kafka-clients which does not pull in scala anymore ?
  
   If so, awesome!
  
 - pyr
  
   On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu yu_l...@hotmail.com wrote:
  
Congrats! When do you think the final 0.82 will be released?
   
 To: annou...@apache.org; users@kafka.apache.org;
  d...@kafka.apache.org
 Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
 Date: Tue, 28 Oct 2014 00:50:35 +
 From: joest...@apache.org

 The Apache Kafka community is pleased to announce the beta release
  for
Apache Kafka 0.8.2.

 The 0.8.2-beta release introduces many new features, improvements
 and
fixes including:
  - A new Java producer for ease of implementation and enhanced
performance.
  - Delete topic support.
  - Per topic configuration of preference for consistency over
availability.
  - Scala 2.11 support and dropping support for Scala 2.8.
  - LZ4 Compression.

 All of the changes in this release can be found:
https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html

 Apache Kafka is high-throughput, publish-subscribe messaging system
rethought of as a distributed commit log.

 ** Fast = A single Kafka broker can handle hundreds of megabytes
 of
reads and
 writes per second from thousands of clients.

 ** Scalable = Kafka is designed to allow a single cluster to serve
  as
the central data backbone
 for a large organization. It can be elastically and transparently
expanded without downtime.
 Data streams are partitioned and spread over a cluster of machines
 to
allow data streams
 larger than the capability of any single machine and to allow
  clusters
of co-ordinated consumers.

 ** Durable = Messages are persisted on disk and replicated within
  the
cluster to prevent
 data loss. Each broker can handle terabytes of messages without
performance impact.

 ** Distributed by Design = Kafka has a modern cluster-centric
 design
that offers
 strong durability and fault-tolerance guarantees.

 You can download the release from:
http://kafka.apache.org/downloads.html

 We welcome your help and feedback. For more information on how to
 report problems, and to get involved, visit the project website at
http://kafka.apache.org/

   
   
  
 



Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-11-03 Thread Jason Rosenberg
Also, that doc refers to the 'new producer' as available in trunk and of
beta quality.

But from the announcement, it seems it's now more properly integrated in
the release?  Also, where can I read about the 'kafka-client' referred to
above?

Thanks,

Jason



On Mon, Nov 3, 2014 at 4:46 PM, Jason Rosenberg j...@squareup.com wrote:

 Are there any config parameter updates/changes?  I see the doc here:
 http://kafka.apache.org/documentation.html#configuration
 now defaults to 0.8.2-beta.  But it would be useful to know if anything
 has changed from 0.8.1.1, just so we can be sure to update things, etc.



 On Sat, Nov 1, 2014 at 11:18 AM, Koert Kuipers ko...@tresata.com wrote:

 joe,
 looking at those 0.8.2 beta javadoc I also see a Consumer api and
 KafkaConsumer class. they look different from what I currently use in
 8.1.1. Is this new? And this is not the 0.9 consumer?
 thanks, koert
 On Oct 30, 2014 8:01 AM, Joe Stein joe.st...@stealth.ly wrote:

  Hey, yeah!
 
  For the new producer
  https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
 
  The java consumer is slated in 0.9 more on that here
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
  On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard 
 p...@spootnik.org
  wrote:
 
   Hi Joe et al.
  
   Congrats on the beta release!
   Do I read correctly that libraries can now rely on
   org.apache.kafka/kafka-clients which does not pull in scala anymore ?
  
   If so, awesome!
  
 - pyr
  
   On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu yu_l...@hotmail.com wrote:
  
Congrats! When do you think the final 0.82 will be released?
   
 To: annou...@apache.org; users@kafka.apache.org;
  d...@kafka.apache.org
 Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
 Date: Tue, 28 Oct 2014 00:50:35 +
 From: joest...@apache.org

 The Apache Kafka community is pleased to announce the beta release
  for
Apache Kafka 0.8.2.

 The 0.8.2-beta release introduces many new features, improvements
 and
fixes including:
  - A new Java producer for ease of implementation and enhanced
performance.
  - Delete topic support.
  - Per topic configuration of preference for consistency over
availability.
  - Scala 2.11 support and dropping support for Scala 2.8.
  - LZ4 Compression.

 All of the changes in this release can be found:
https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html

 Apache Kafka is high-throughput, publish-subscribe messaging
 system
rethought of as a distributed commit log.

 ** Fast = A single Kafka broker can handle hundreds of megabytes
 of
reads and
 writes per second from thousands of clients.

 ** Scalable = Kafka is designed to allow a single cluster to
 serve
  as
the central data backbone
 for a large organization. It can be elastically and transparently
expanded without downtime.
 Data streams are partitioned and spread over a cluster of
 machines to
allow data streams
 larger than the capability of any single machine and to allow
  clusters
of co-ordinated consumers.

 ** Durable = Messages are persisted on disk and replicated within
  the
cluster to prevent
 data loss. Each broker can handle terabytes of messages without
performance impact.

 ** Distributed by Design = Kafka has a modern cluster-centric
 design
that offers
 strong durability and fault-tolerance guarantees.

 You can download the release from:
http://kafka.apache.org/downloads.html

 We welcome your help and feedback. For more information on how to
 report problems, and to get involved, visit the project website at
http://kafka.apache.org/

   
   
  
 





consumer rebalance weirdness

2014-08-07 Thread Jason Rosenberg
We've noticed that some of our consumers are more likely to repeatedly
trigger rebalancing when the app is consuming messages more slowly (e.g.
persisting data to back-end systems, etc.).

If on the other hand we 'fast-forward' the consumer (which essentially
means we tell it to consume but do nothing with the messages until all
caught up), it will never decide to do a rebalance during this time.  So it
can go hours without rebalancing while fast forwarding and consuming super
fast, while during normal processing, it might decide to rebalance every
minute or so.

Is there any simple explanation for this?

Usually the trigger for rebalance logged is that a topic info for path X
has changed to Y, triggering rebalance.

Thanks for any ideas.

We'd like to reduce the rebalancing, as it essentially slows down
consumption each time it happens.

Thanks

Jason


Re: consumer rebalance weirdness

2014-08-07 Thread Jason Rosenberg
Well, it's possible that when processing, it might take longer than the
zookeeper timeout to process a message, intermittently.  Would that cause a
zookeeper timeout?

(btw I'm usind 0.8.1.1).


On Thu, Aug 7, 2014 at 2:30 AM, Clark Haskins chask...@linkedin.com.invalid
 wrote:

 Is your application possibly timing out its zookeeper connection during
 consumption while doing its processing, thus triggering the rebalance?

 -Clark

 On 8/6/14, 11:18 PM, Jason Rosenberg j...@squareup.com wrote:

 We've noticed that some of our consumers are more likely to repeatedly
 trigger rebalancing when the app is consuming messages more slowly (e.g.
 persisting data to back-end systems, etc.).
 
 If on the other hand we 'fast-forward' the consumer (which essentially
 means we tell it to consume but do nothing with the messages until all
 caught up), it will never decide to do a rebalance during this time.  So
 it
 can go hours without rebalancing while fast forwarding and consuming super
 fast, while during normal processing, it might decide to rebalance every
 minute or so.
 
 Is there any simple explanation for this?
 
 Usually the trigger for rebalance logged is that a topic info for path X
 has changed to Y, triggering rebalance.
 
 Thanks for any ideas.
 
 We'd like to reduce the rebalancing, as it essentially slows down
 consumption each time it happens.
 
 Thanks
 
 Jason




Re: delete topic ?

2014-08-07 Thread Jason Rosenberg
Since the deletion stuff is now in trunk, would be compatible to issue the
command from a jar built from trunk, against a running 0.8.1.1 cluster?  Or
does the cluster also have to be running trunk?  (I'm guessing it does :)).

I have some topics I'd like to delete, but don't want to wait for 0.8.2
(but will probably have to, I'm guessing).

Jason


On Thu, Aug 7, 2014 at 2:53 AM, Timothy Chen tnac...@gmail.com wrote:

 Hi Gwen,

 That is a very confusing error message for sure, feel free to file a
 jira for both the experience cases.

 But in general how delete topic works is that it creates a entry in
 the delete_topic zk path, and the leader has a delete topic thread
 that watches that path and starts the topic deletion once it receives
 the message. It then requires rounds of coordination among all the
 brokers that has partitions for the topic to delete all the
 partitions, then finally delete the topic from zk.

 Therefore once the deletion finishes it will also deleted from zk. The
 topic command can definitely however join the topic list with the
 delete topic list and mark the ones being deleted with a special
 status.

 Tim

 On Wed, Aug 6, 2014 at 11:20 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
  Hi Timothy,
 
  While we are on the subject, few questions/comments (based on the
  trunk implementation of delete topic command):
 
  * After deleting a topic, I still see it when listing topics. Is the
  expected behavior? Should it disappear after some time?
  * When does the actual deletion gets triggered?
  * If I try to delete a topic twice I get a pretty confusing exception
  (Node exists from zkclient). It will be nice to catch this and say
  Topic is being deleted or something to this effect.
  * Even nicer if list topics command will mark topics as being deleted.
 
  I'll probably open a separate Jira for the nice behavior, but
  interested in hearing your thoughts.
 
  Gwen
 
  On Wed, Aug 6, 2014 at 11:01 PM, Timothy Chen tnac...@gmail.com wrote:
  Is this the latest master? I've added the delete option in trunk, but
  it's not in any release yet.
 
  We used to have the delete option flag but I believe we removed it
  that's why the documentation difference.
 
  Tim
 
  On Wed, Aug 6, 2014 at 10:53 PM, Shlomi Hazan shl...@viber.com wrote:
  if the answer is pointing out the 'chroot', as a word, it makes no
  difference. the result is the same:
 
  kafka/bin/kafka-topics.sh --zookeeper localhost:2181/chroot --delete
  --topic topic-3
 
  gives the same:
 
  Command must include exactly one action: --list, --describe, --create
 or
  --alter...
 
  or should I write something instead of chroot?
 
 
 
  On Wed, Jun 18, 2014 at 2:06 PM, Shlomi Hazan shl...@viber.com
 wrote:
 
  Hi,
 
  Doing some evaluation testing, and accidently create a queue with
 wrong
  replication factor.
 
  Trying to delete as in:
 
  kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181
 --delete
  --topic replicated-topic
 
  Yeilded:
 
  Command must include exactly one action: --list, --describe, --create
 or
  –alter
 
  Event though this page (https://kafka.apache.org/documentation.html)
 says:
 
 
 
  And finally deleting a topic:
 
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete
 --topic my_topic_name
 
  WARNING: Delete topic functionality is beta in 0.8.1. Please report
 any
  bugs that you encounter on themailing list %20us...@kafka.apache.org
 or
  JIRA https://issues.apache.org/jira/browse/KAFKA.
 
  Kafka does not currently support reducing the number of partitions
 for a
  topic or changing the replication factor.
 
  What should I do?
 
  Shlomi
 



Re: consumer rebalance weirdness

2014-08-07 Thread Jason Rosenberg
Yeah, it's possible that's happening (but no smoking gun).  The main thing
I'm seeing is that when it actually takes the time to process messages, it
takes longer to get back to the ConsumerIterator for the next message.
 That alone seems to be the problem (does that make any sense)?  I would
have thought the zk listeners are in separate async threads (and that's
what it looks like looking at the kafka consumer code).

Maybe I should increase the zk session timeout and see if that helps.


On Thu, Aug 7, 2014 at 2:56 PM, Philip O'Toole 
philip.oto...@yahoo.com.invalid wrote:

 A big GC pause in your application, for example, could do it.

 Philip


 -
 http://www.philipotoole.com


 On Thursday, August 7, 2014 11:56 AM, Philip O'Toole 
 philip.oto...@yahoo.com wrote:



 I think the question is what in your consuming application could cause it
 not to check in with ZK for longer than the timeout.


 -
 http://www.philipotoole.com


 On Thursday, August 7, 2014 8:16 AM, Jason Rosenberg j...@squareup.com
 wrote:



 Well, it's possible that when processing, it might take longer than the
 zookeeper timeout to process a message, intermittently.  Would that cause a
 zookeeper timeout?

 (btw I'm usind 0.8.1.1).



 On Thu, Aug 7, 2014 at 2:30 AM, Clark Haskins
 chask...@linkedin.com.invalid
  wrote:

  Is your application possibly timing out its zookeeper connection during
  consumption while doing its processing, thus triggering the rebalance?
 
  -Clark
 
  On 8/6/14, 11:18 PM, Jason Rosenberg j...@squareup.com wrote:
 
  We've noticed that some of our consumers are more likely to repeatedly
  trigger rebalancing when the app is consuming messages more slowly (e.g.
  persisting data to back-end systems, etc.).
  
  If on the other hand we 'fast-forward' the consumer (which essentially
  means we tell it to consume but do nothing with the messages until all
  caught up), it will never decide to do a rebalance during this time.  So
  it
  can go hours without rebalancing while fast forwarding and consuming
 super
  fast, while during normal processing, it might decide to rebalance every
  minute or
  so.
  
  Is there any simple explanation for this?
  
  Usually the trigger for rebalance logged is that a topic info for path
 X
  has changed to Y, triggering rebalance.
  
  Thanks for any ideas.
  
  We'd like to reduce the rebalancing, as it essentially slows down
  consumption each time it happens.
  
  Thanks
  
  Jason
 
 



Re: much reduced io utilization after upgrade to 0.8.0 - 0.8.1.1

2014-07-23 Thread Jason Rosenberg
Thanks for the improvement!
(I'm not explicitly configuring fsync policy)

Jason


On Wed, Jul 23, 2014 at 12:33 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Yes, it could definitely be related to KAFKA-615. The default in 0.8.1
 is to let the OS handle disk writes. This is much more efficient as it
 will schedule them in an order friendly to the layout on disk and do a
 good job of merging adjacent writes. However if you are explicitly
 configuring an fsync policy (either by time or number of messages)
 then this is likely not the cause.

 -Jay

 On Tue, Jul 22, 2014 at 9:37 PM, Jason Rosenberg j...@squareup.com wrote:
  I recently upgraded some of our kafka clusters to use 0.8.1.1 (from
 0.8.0).
   It's all looking good so far.  One thing I notice though (seems like a
  good thing) is that the iostat utilization has gone way down after the
  upgrade.
 
  I'm not sure if I know exactly what could could be responsible for this,
 is
  this an expected result.
 
  Is it possibly related to:
 https://issues.apache.org/jira/browse/KAFKA-615
 
  Thanks,
 
  Jason



much reduced io utilization after upgrade to 0.8.0 - 0.8.1.1

2014-07-22 Thread Jason Rosenberg
I recently upgraded some of our kafka clusters to use 0.8.1.1 (from 0.8.0).
 It's all looking good so far.  One thing I notice though (seems like a
good thing) is that the iostat utilization has gone way down after the
upgrade.

I'm not sure if I know exactly what could could be responsible for this, is
this an expected result.

Is it possibly related to:  https://issues.apache.org/jira/browse/KAFKA-615

Thanks,

Jason


Re: status of 0.8.2

2014-07-08 Thread Jason Rosenberg
Is there a blocker to getting the patch for kafka-1180 applied?  Is the
patch for 0.8.0 no longer compatible for trunk?  I'm actually going to see
if I can get it to work for 0.8.1.1 today.

Thanks,

Jason


On Mon, Jul 7, 2014 at 9:41 PM, Jun Rao jun...@gmail.com wrote:

 Two biggest features in 0.8.2 are Kafka-based offset management and the new
 producer. We are in the final stage of testing them. We also haven't fully
 tested the delete topic feature. So, we are probably 4-6 weeks away from
 releasing 0.8.2.

 For kafka-1180, the patch hasn't been applied yet and we will need a patch
 for trunk.

 Thanks,

 Jun


 On Mon, Jul 7, 2014 at 7:31 AM, Jason Rosenberg j...@squareup.com wrote:

  What's the status for an 0.8.2 release?  We are currently using 0.8.0,
 and
  would like to upgrade to take advantage of some of the per-topic
 retention
  options available now in 0.8.1.
 
  However, we'd also like to take advantage of some fixes coming in 0.8.2
  (e.g. deleting topics).
 
  Also, we have been using a patch for (
  https://issues.apache.org/jira/browse/KAFKA-1180) applied to 0.8.0.
  This
  is marked as scheduled for 0.8.2, with a patch available, but I'm not
 sure
  if this has been committed and applied to the 0.8.2 branch yet.
 
  Thanks,
 
  Jason
 



Re: status of 0.8.2

2014-07-08 Thread Jason Rosenberg
Thanks Joe, I'll let you know what I find.  Do you anticipate any issues
with it working in 0.8.1.1?

Jason


On Tue, Jul 8, 2014 at 10:55 AM, Joe Stein joe.st...@stealth.ly wrote:

 I wrote it, so I can't commit it without other committers agreeing.

 Last I recall I updated the patch from the feedback in the reviewboard but
 haven't looked at it in months.

 I am glad though it resolved the issue you were having and we can figure
 how to get the patch to work with 0.8.1.1 if you run into problems.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /


 On Tue, Jul 8, 2014 at 10:45 AM, Jason Rosenberg j...@squareup.com wrote:

  Is there a blocker to getting the patch for kafka-1180 applied?  Is the
  patch for 0.8.0 no longer compatible for trunk?  I'm actually going to
 see
  if I can get it to work for 0.8.1.1 today.
 
  Thanks,
 
  Jason
 
 
  On Mon, Jul 7, 2014 at 9:41 PM, Jun Rao jun...@gmail.com wrote:
 
   Two biggest features in 0.8.2 are Kafka-based offset management and the
  new
   producer. We are in the final stage of testing them. We also haven't
  fully
   tested the delete topic feature. So, we are probably 4-6 weeks away
 from
   releasing 0.8.2.
  
   For kafka-1180, the patch hasn't been applied yet and we will need a
  patch
   for trunk.
  
   Thanks,
  
   Jun
  
  
   On Mon, Jul 7, 2014 at 7:31 AM, Jason Rosenberg j...@squareup.com
  wrote:
  
What's the status for an 0.8.2 release?  We are currently using
 0.8.0,
   and
would like to upgrade to take advantage of some of the per-topic
   retention
options available now in 0.8.1.
   
However, we'd also like to take advantage of some fixes coming in
 0.8.2
(e.g. deleting topics).
   
Also, we have been using a patch for (
https://issues.apache.org/jira/browse/KAFKA-1180) applied to 0.8.0.
This
is marked as scheduled for 0.8.2, with a patch available, but I'm not
   sure
if this has been committed and applied to the 0.8.2 branch yet.
   
Thanks,
   
Jason
   
  
 



Re: kafka 0.8.1.1 log.retention.minutes NOT being honored

2014-07-08 Thread Jason Rosenberg
Ah, ok.it's just no longer documented as such in the config docs?


On Tue, Jul 8, 2014 at 4:46 PM, Guozhang Wang wangg...@gmail.com wrote:

 Jason,

 getLogRetentionTimeMillis() take either log.retention.minutes or
 log.retention.hours and transform the value into milis. So you can
 specify using either granularity.

 Guozhang


 On Tue, Jul 8, 2014 at 1:11 PM, Jason Rosenberg j...@squareup.com wrote:

  On a related note, in doing the upgrade from 0.8.0, I noticed that the
  config property changed from 'log.retention.hours' to
  'log.retention.minutes'.  Would it have made more sense to deprecate
 rather
  than replace there?
 
  Also, I notice that internally, in the KafkaConfig class, it's
 represented
  as logRetentionTimeMillis() (e.g. not hours or minutes).  And the
 per-topic
  version is in ms and not minutes.  So, it all seems a bit confusing there
  (is there a reason for this)?
 
  Jason
 
 
  On Tue, Jul 8, 2014 at 3:54 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Server properties should affect on only the local instance separately.
  Are
   you saying the property is not honored even on the 0.8.1 machines?
  
   Guozhang
  
   On Mon, Jul 7, 2014 at 3:55 PM, Virendra Pratap Singh 
   vpsi...@yahoo-inc.com.invalid wrote:
  
By setting this property
log.retention.mins=10
in the server.properties file, which is passed as argument when
  starting
the broker.
   
Virendra
   
On 7/7/14, 3:31 PM, Guozhang Wang wangg...@gmail.com wrote:
   
How do you set the retention.minutes property? Is it through
 zk-based
topics tool?

Guozhang


On Mon, Jul 7, 2014 at 3:07 PM, Virendra Pratap Singh 
vpsi...@yahoo-inc.com.invalid wrote:

 I am running a mixed cluster as I mentioned earlier. 1 broker
 0.8.0
   and
 the other 0.8.1.1. Should the retention of topics for partitions
 owned/replicated by the broker running 0.8.1.1 not enforce the
  server
 properties settings as defined for that server.

 So this brings an interesting question, in case of heterogeneous
 environment (as is in my case, which system parameters will take
 preference/precedence).

 Virendra

 On 6/30/14, 9:19 AM, Guozhang Wang wangg...@gmail.com wrote:

 The retention.minute property is only introduced in 0.8.1:
 
 https://issues.apache.org/jira/browse/KAFKA-918
 
 if you are running 0.8.0 then it will not be recognized.
 
 Guozhang
 
 
 
 On Fri, Jun 27, 2014 at 2:13 PM, Virendra Pratap Singh 
 vpsi...@yahoo-inc.com.invalid wrote:
 
  Running a mixed 2 broker cluster. Mixed as in one of the
 broker1
  is
  running 0.8.0 and broker2 one 0.8.1.1 (from the apache release
   link.
  Directly using the tar ball, no local build used).
 
  I have set the log.retention.minutes=10. However the broker is
  not
  honoring the setting. I see its not cleaning the log.dir at
 all.
 
  However when I set the log.retention.hours=1, then it starts
   cleaning
 the
  log.
 
  When I have the log.retention.minutes set in the
  server.properties
then
 I
  see this logged in server.log:
 
  Š..
  [2014-06-27 19:21:06,633] WARN Property log.retention.minutes
 is
   not
 valid
  (kafka.utils.VerifiableProperties)
  [2014-06-27 19:21:06,633] WARN Property log.retention.minutes
 is
   not
 valid
  (kafka.utils.VerifiableProperties)
  ŠŠ
 
 
  I have set these properties too:
 
  log.cleaner.enable=true
  log.cleanup.policy=delete
 
 
  But I see similar warning logged for these properties too.
 
  Regards,
  Virendra
 
 
 
 
 --
 -- Guozhang




--
-- Guozhang
   
   
  
  
   --
   -- Guozhang
  
 



 --
 -- Guozhang



Re: quick question about new consumer api

2014-07-07 Thread Jason Rosenberg
Guozhang,

I'm not suggesting we parallelize within a partition

The problem with the current high-level consumer is, if you use a regex to
select multiple topics, and then have multiple consumers in the same group,
usually the first consumer will 'own' all the topics, and no amount of
sub-sequent rebalancing will allow other consumers in the group to own some
of the topics.  Re-balancing does allow other consumers to own multiple
partitions, but if a topic has only 1 partition, only the first consumer to
initialize will get all the work.

So, I'm wondering if the new api will be better about re-balancing the work
at the partition level, and not the topic level, as such.

Jason


On Mon, Jul 7, 2014 at 11:17 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Jason,

 In the new design the consumption is still at the per-partition
 granularity. The main rationale of doing this is ordering: Within a
 partition we want to preserve the ordering such that message B produced
 after message A will also be consumed and processed after message A. And
 producers can use keys to make sure messages with the same ordering group
 will be in the same partition. To do this we have to make one partition
 only being consumed by a single client at a time. On the other hand, when
 one wants to add the number of consumers beyond the number of partitions,
 he can always use the topic tool to dynamically add more partitions to the
 topic.

 Do you have a specific scenario in mind that would require single-partition
 topics?

 Guozhang



 On Mon, Jul 7, 2014 at 7:43 AM, Jason Rosenberg j...@squareup.com wrote:

  I've been looking at the new consumer api outlined here:
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
 
  One issue in the current high-level consumer, is that it does not do a
 good
  job of distributing a set of topics between multiple consumers, unless
 each
  topic has multiple partitions.  This has always seemed strange to me,
 since
  at the end of the day, even for single partition topics, the basic unit
 of
  consumption is still at the partition level (so you'd expect rebalancing
 to
  try to evenly distribute partitions (regardless of the topic)).
 
  It's not clearly spelled out in the new consumer api wiki, so I'll just
  ask, will this issue be addressed in the new api?  I think I've asked
 this
  before, but I wanted to go check again, and am not seeing this explicitly
  addressed in the design.
 
  Thanks
 
  Jason
 



 --
 -- Guozhang



Re: quick question about new consumer api

2014-07-07 Thread Jason Rosenberg
Great, that's reassuring!

What's the time frame for having a more or less stable version to try out?

Jason


On Mon, Jul 7, 2014 at 12:59 PM, Guozhang Wang wangg...@gmail.com wrote:

 I see your point now. The old consumer does have a hard-coded
 round-robin-per-topic logic which have this issue. In the new consumer,
 we will make the assignment logic customizable so that people can specify
 different rebalance algorithms they like.

 Also I will soon send out a new consumer design summary email for more
 comments. Feel free to give us more thoughts you have about the new
 consumer design.

 Guozhang


 On Mon, Jul 7, 2014 at 8:44 AM, Jason Rosenberg j...@squareup.com wrote:

  Guozhang,
 
  I'm not suggesting we parallelize within a partition
 
  The problem with the current high-level consumer is, if you use a regex
 to
  select multiple topics, and then have multiple consumers in the same
 group,
  usually the first consumer will 'own' all the topics, and no amount of
  sub-sequent rebalancing will allow other consumers in the group to own
 some
  of the topics.  Re-balancing does allow other consumers to own multiple
  partitions, but if a topic has only 1 partition, only the first consumer
 to
  initialize will get all the work.
 
  So, I'm wondering if the new api will be better about re-balancing the
 work
  at the partition level, and not the topic level, as such.
 
  Jason
 
 
  On Mon, Jul 7, 2014 at 11:17 AM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi Jason,
  
   In the new design the consumption is still at the per-partition
   granularity. The main rationale of doing this is ordering: Within a
   partition we want to preserve the ordering such that message B produced
   after message A will also be consumed and processed after message A.
 And
   producers can use keys to make sure messages with the same ordering
 group
   will be in the same partition. To do this we have to make one partition
   only being consumed by a single client at a time. On the other hand,
 when
   one wants to add the number of consumers beyond the number of
 partitions,
   he can always use the topic tool to dynamically add more partitions to
  the
   topic.
  
   Do you have a specific scenario in mind that would require
  single-partition
   topics?
  
   Guozhang
  
  
  
   On Mon, Jul 7, 2014 at 7:43 AM, Jason Rosenberg j...@squareup.com
  wrote:
  
I've been looking at the new consumer api outlined here:
   
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
   
One issue in the current high-level consumer, is that it does not do
 a
   good
job of distributing a set of topics between multiple consumers,
 unless
   each
topic has multiple partitions.  This has always seemed strange to me,
   since
at the end of the day, even for single partition topics, the basic
 unit
   of
consumption is still at the partition level (so you'd expect
  rebalancing
   to
try to evenly distribute partitions (regardless of the topic)).
   
It's not clearly spelled out in the new consumer api wiki, so I'll
 just
ask, will this issue be addressed in the new api?  I think I've asked
   this
before, but I wanted to go check again, and am not seeing this
  explicitly
addressed in the design.
   
Thanks
   
Jason
   
  
  
  
   --
   -- Guozhang
  
 



 --
 -- Guozhang



Re: 0.7 - 0.8 Protocol Upgrade in production environments

2014-01-21 Thread Jason Rosenberg
In my case, we just rolled out a separate 0.8 cluster, and migrated
producers to it over time (took several weeks to get everything
updated to the new cluster).  In the transition, we had consumers
running for both clusters.  Once no traffic was flowing on the old
cluster, we then shut down the 0.7 cluster.

I think this was simpler than trying to forward data from one to the other.

You can't have the 2 existing in a mixed state in the same cluster, at
all.  The protocols and storage formats are entirely different.

Jason

On Tue, Jan 21, 2014 at 5:38 PM, François Langelier
f.langel...@gmail.com wrote:
 Hi,

 I'm also in the process to upgrade from 0.7 to 0.8. WIth the informations I
 found, your best friend is here :
 https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8
 It's a migration tool from 0.7 to 0.8.

 You will have to install 0.8 while you still have you 0.7 running. When
 those and your 0.8 consumers run, you will have to lunch the migration tool
 that will consume your 0.7 message to produce them to your 0.8 environment.
 When all your message will be transfer to your 0.8, you will them be able
 to transfer your 0.7 producers to your 0.8 environment.

 A picture is worth 1000 words :
 [image: Inline image 1]

 So for you b) , yes it's possible, but as far as I know, you will need to
 have a tool to verify the duplication of your message because some message
 will probably be consume in 0.7 and 0.8 while you are migrating because the
 offsets are incompatible between 0.7 and 0.8...

 c) yes if you use the migration tool





 François Langelier
 Étudiant en génie Logiciel - École de Technologie
 Supérieurehttp://www.etsmtl.ca/
 Capitaine Club Capra http://capra.etsmtl.ca/
 VP-Communication - CS Games http://csgames.org 2014
 Jeux de Génies http://www.jdgets.com/ 2011 à 2014
 Argentier Fraternité du Piranhas http://fraternitedupiranha.com/ 2012-2014
 Comité Organisateur Olympiades ÉTS 2012
 Compétition Québécoise d'Ingénierie 2012 - Compétition Senior


 On Tue, Jan 21, 2014 at 12:48 PM, Clark Breyman cl...@breyman.com wrote:

 I'm curious what the recommended best practice is for migrating a
 production environment with replication from 0.7 to 0.8 given the protocol
 upgrade. Some specific questions I have are:

 a) Is it possible to mix 0.7 and 0.8 servers for a given partition during
 the migration?

 b) If we can't mix server versions, how do we do a zero-downtime upgrade?

 c) Will 0.7 clients continue to communicate correctly on-the-wire with the
 0.8 server until they can be upgraded?


 Any war stories of production upgrades would be helpful. Thanks in advance.
 - Clark



Re: Patterns for message failure handling with Kafka

2014-01-21 Thread Jason Rosenberg
So, I think there are 2 different types of errors you mention.  The
first is data-dependent (e.g. it's corrupt or some such).  So, there's
no reason to block consumption of other messages that are likely to be
successful, while the data-dependent one won't fix itself no matter
times you retry.  So, for that, I think it makes sense to stash it
away to be retried later (or just logged as invalid and carry on).

For transient failures (e.g. a downstream dependent service is not
available), then I think it's fine to just keep retrying (with
exponential back off) until it succeeds, before processing the next
message (which will also likely fail for the same reason).

Jason



On Tue, Jan 21, 2014 at 5:46 PM, Jim jimi...@gmail.com wrote:
 I'm looking at message delivery patterns for Kafka consumers and wanted to
 get people's thoughts on the following problem:


 The objective is to ensure processing of individual messages with as much
 certainty as possible for at least once guarantees. I'm looking to have a
 kafka consumer pull n messages, assuming 100 for arguments sake, process
 them, commit the offset, then grab 100 more.

 The issue comes in where you have single message failure. For example
 message 30 cannot be deserialized, message 40 failed because of some 3rd
 party service that was down for an instant, etc... So we're looking at
 having a topic and a topic_retry pattern for consumers so that if there was
 a single message failure we'd put messages 30 and 40 in the retry topic
 with a failure count of 1 and if that failure count passes 3 it goes to
 cold storage for manual analysis. Once we have processed all 100 either by
 success or making sure they were re-enqueued we commit the offset, then
 grab more messages. If the percentage of retry topics goes over a threshold
 trip a circuit breaker for the consumer to stop pulling messages until the
 issue can be resolved to prevent re-try flooding.

 What are some patterns around this that people are using currently to
 handle message failures at scale with kafka?


 pardon if this is a frequent question but the
 http://search-hadoop.com/kafka server
 is down so I can't search historicals at the moment.

 thanks,
 Jim


Re: log.retention.bytes.per.topic does not trigger deletion

2014-01-19 Thread Jason Rosenberg
Please be sure to update the online config docs with this change!  The
per topic options are still listed there

Jason

On Thu, Jan 16, 2014 at 9:57 PM, Ben Summer bsum...@gnipcentral.com wrote:
 I see. I don't have version 0.8.1 yet. We just updated to 0.8.0 from beta
 after it became the stable version.
 Good to know there is a fix for this. I'll start trying it out in some
 non-production environments.

 Thanks,
 Ben


 On Thu, Jan 16, 2014 at 7:42 PM, Guozhang Wang wangg...@gmail.com wrote:

 In the latest version, per-topic configs have been moved to Zookeeper you
 and set them using admin tools instead of writing the config files. Could
 you try trunk HEAD and see if this issue has already been resolved:

 https://issues.apache.org/jira/browse/KAFKA-554

 Guozhang


 On Thu, Jan 16, 2014 at 6:27 PM, Ben Summer bsum...@gnipcentral.com
 wrote:

  v0.8.0, (non-beta) downloaded from the website. Let me know if there is a
  file I can check in the installation footprint that can give you an
 actual
  change id.
 
  Is there a fix to this particular feature that you know was made in a
 later
  build?
 
  Thanks for the quick response,
  Ben
 
 
  On Thu, Jan 16, 2014 at 6:22 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hello Ben,
  
   Which version are you using?
  
   Guozhang
  
  
   On Thu, Jan 16, 2014 at 3:15 PM, Ben Summer bsum...@gnipcentral.com
   wrote:
  
I tried using the following two retention properties
   
log.retention.bytes=3221225472
   
   
  
 
 log.retention.bytes.per.topic=first_topic:1099511627776,second_topic:1099511627776
   
which I interpret to mean by default, keep 3GB per topic partition,
   except
for first_topic and second_topic, which should retain 1TB each
 (across
   all
their partitions).
   
I've noticed that the data for first_topic and second_topic grow way
   beyond
1TB and no deletions occur. Is the syntax correct?
   
Thanks in advance,
Ben
   
  
  
  
   --
   -- Guozhang
  
 



 --
 -- Guozhang



Re: Consumers can't connect while broker is under load

2014-01-14 Thread Jason Rosenberg
12 zookeepers seems like a lot..and you should always, by default,
prefer an odd number of zookeepers.  Consumers negotiate with each
other for partition ownership, via zookeeper.

Jason

On Fri, Jan 10, 2014 at 9:20 PM, Guozhang Wang wangg...@gmail.com wrote:
 Can you post the consumer logs for the long-time-to-connect scenarios?

 Guozhang


 On Fri, Jan 10, 2014 at 1:20 PM, Tim Kellogg t...@2lemetry.com wrote:

 Hi,

 I have a cluster of 12 brokers receiving 10,000 msg/s from producers where
 each message is roughly 2.5KB. We also have 12 ZooKeepers and everything is
 on AWS. Under these conditions, top (the Linux utility) reports around
 10-15 out of 32 for system load, so we’re at less than half capacity.

 When under this load consumers take a very long time, often more than 30
 minutes, to connect to the brokers. When under no load they connect
 immediately. Why is this happening?

 Thanks,

 Tim Kellogg
 Sr. Software Engineer, Protocols
 2lemetry
 @kellogh




 --
 -- Guozhang


Re: understanding OffsetOutOfRangeException's....

2014-01-12 Thread Jason Rosenberg
Not sure, but I'll try (it's a bit difficult to create a test-case, because
it requires a good bit of integration testing, etc.).

Jason


On Sat, Jan 11, 2014 at 12:06 AM, Jun Rao jun...@gmail.com wrote:

 Do you think you can reproduce this easily?

 Thanks,

 Jun




Re: understanding OffsetOutOfRangeException's....

2014-01-10 Thread Jason Rosenberg
well, not currently, as we don't have multiple partitions for the
topics.but yes, I understand that would help too

but, we are using this multiple consumers within a process approach in
general with much success so far..just was curious about this ERROR I
was seeing :)


On Fri, Jan 10, 2014 at 11:06 AM, Jun Rao jun...@gmail.com wrote:

 Could you increase parallelism on the consumers?

 Thanks,

 Jun


 On Thu, Jan 9, 2014 at 1:22 PM, Jason Rosenberg j...@squareup.com wrote:

  The consumption rate is a little better after the refactoring.  The main
  issue though, was that we had a mismatch between large and small topics.
  A
  large topic can lag, and adversely affect consumption of other topics, so
  this is an attempt to isolate topic filtering, and better balance the
  consumers for the different topics.
 
  So, it's definitely working on that score.
 
  The topic that was lagging (and getting OffsetOutOfRangeExceptions) was
  doing that before and after the refactor (and after we started also
 seeing
  the ERROR logging).  But consumption of all other topics is working
 better
  now (almost no lag at all).
 
  I'm also setting the client.id for each consumer in the process, so
 that I
  can see the individual metrics per consumer.
 
  Jason
 
 
  On Thu, Jan 9, 2014 at 1:00 PM, Jun Rao jun...@gmail.com wrote:
 
   Does the consumption rate in the client (msg/sec) change significantly
   after the refactoring?
  
   Thanks,
  
   Jun
  
  
   On Wed, Jan 8, 2014 at 10:44 AM, Jason Rosenberg j...@squareup.com
  wrote:
  
Yes, it's happening continuously, at the moment (although I'm
 expecting
   the
consumer to catch up soon)
   
It seemed to start happening after I refactored the consumer app to
 use
multiple consumer connectors in the same process (each one has a
  separate
topic filter, so should be no overlap between them).  All using the
  same
consumer group.
   
Could it be a thread safety issue in the ZookeeperConsumerConnector
   (seems
unlikely).
   
Jason
   
   
On Wed, Jan 8, 2014 at 1:04 AM, Jun Rao jun...@gmail.com wrote:
   
 Normally, if the consumer can't keep up, you should just see the
 OffsetOutOfRangeException warning. The offset mismatch error should
   never
 happen. It could be that OffsetOutOfRangeException exposed a bug.
 Do
   you
 think you can reproduce this easily?

 Thanks,

 Jun


 On Tue, Jan 7, 2014 at 9:29 PM, Jason Rosenberg j...@squareup.com
wrote:

  Jun,
 
  I'm not sure I understand your question, wrt produced data?
 
  But yes, in general, I believe the consumer is not keeping up
 with
   the
  broker's deleting the data.  So it's trying to fetch the next
 batch
   of
  data, but it's last offset is no longer there, etc.  So that's
 the
reason
  for the WARN message, in the fetcher thread.
 
  I'm just not sure I understand then why we don't always see the
  ConsumerIterator error also, because won't there always be
 missing
   data
  detected there?  Why sometimes and not always?  What's the
   difference?
 
  Jason
 
 
  On Wed, Jan 8, 2014 at 12:07 AM, Jun Rao jun...@gmail.com
 wrote:
 
   The WARN and ERROR may not be completely correlated. Could it
 be
   that
 the
   consumer is slow and couldn't keep up with the produced data?
  
   Thanks,
  
   Jun
  
  
   On Tue, Jan 7, 2014 at 6:47 PM, Jason Rosenberg 
  j...@squareup.com
  wrote:
  
So, sometimes I just get the WARN from the
  ConsumerFetcherThread
(as
previously noted, above), e.g.:
   
2014-01-08 02:31:47,394  WARN
 [ConsumerFetcherThread-myconsumerapp-11]
consumer.ConsumerFetcherThread -
[ConsumerFetcherThread-myconsumerapp-11], Current offset
16163904970
for partition [mypartition,0] out of range; reset offset to
16175326044
   
More recently, I see these in the following log line (not
 sure
   why
I
didn't see it previously), coming from the ConsumerIterator:
   
2014-01-08 02:31:47,681 ERROR [myconsumerthread-0]
consumer.ConsumerIterator - consumed offset: 16163904970
  doesn't
 match
fetch offset: 16175326044 for mytopic:0: fetched offset =
 16175330598:
consumed offset = 16163904970;
 Consumer may lose data
   
Why would I not see this second ERROR everytime there's a
corresponding WARN on the FetcherThread for an offset reset?
   
Should I only be concerned about possible lost data if I see
  the
second ERROR log line?
   
Jason
   
On Tue, Dec 24, 2013 at 3:49 PM, Jason Rosenberg 
   j...@squareup.com

   wrote:
 But I assume this would not be normally you'd want to log
   (every
 incoming producer request?).  Maybe just

Re: understanding OffsetOutOfRangeException's....

2014-01-09 Thread Jason Rosenberg
The consumption rate is a little better after the refactoring.  The main
issue though, was that we had a mismatch between large and small topics.  A
large topic can lag, and adversely affect consumption of other topics, so
this is an attempt to isolate topic filtering, and better balance the
consumers for the different topics.

So, it's definitely working on that score.

The topic that was lagging (and getting OffsetOutOfRangeExceptions) was
doing that before and after the refactor (and after we started also seeing
the ERROR logging).  But consumption of all other topics is working better
now (almost no lag at all).

I'm also setting the client.id for each consumer in the process, so that I
can see the individual metrics per consumer.

Jason


On Thu, Jan 9, 2014 at 1:00 PM, Jun Rao jun...@gmail.com wrote:

 Does the consumption rate in the client (msg/sec) change significantly
 after the refactoring?

 Thanks,

 Jun


 On Wed, Jan 8, 2014 at 10:44 AM, Jason Rosenberg j...@squareup.com wrote:

  Yes, it's happening continuously, at the moment (although I'm expecting
 the
  consumer to catch up soon)
 
  It seemed to start happening after I refactored the consumer app to use
  multiple consumer connectors in the same process (each one has a separate
  topic filter, so should be no overlap between them).  All using the same
  consumer group.
 
  Could it be a thread safety issue in the ZookeeperConsumerConnector
 (seems
  unlikely).
 
  Jason
 
 
  On Wed, Jan 8, 2014 at 1:04 AM, Jun Rao jun...@gmail.com wrote:
 
   Normally, if the consumer can't keep up, you should just see the
   OffsetOutOfRangeException warning. The offset mismatch error should
 never
   happen. It could be that OffsetOutOfRangeException exposed a bug. Do
 you
   think you can reproduce this easily?
  
   Thanks,
  
   Jun
  
  
   On Tue, Jan 7, 2014 at 9:29 PM, Jason Rosenberg j...@squareup.com
  wrote:
  
Jun,
   
I'm not sure I understand your question, wrt produced data?
   
But yes, in general, I believe the consumer is not keeping up with
 the
broker's deleting the data.  So it's trying to fetch the next batch
 of
data, but it's last offset is no longer there, etc.  So that's the
  reason
for the WARN message, in the fetcher thread.
   
I'm just not sure I understand then why we don't always see the
ConsumerIterator error also, because won't there always be missing
 data
detected there?  Why sometimes and not always?  What's the
 difference?
   
Jason
   
   
On Wed, Jan 8, 2014 at 12:07 AM, Jun Rao jun...@gmail.com wrote:
   
 The WARN and ERROR may not be completely correlated. Could it be
 that
   the
 consumer is slow and couldn't keep up with the produced data?

 Thanks,

 Jun


 On Tue, Jan 7, 2014 at 6:47 PM, Jason Rosenberg j...@squareup.com
wrote:

  So, sometimes I just get the WARN from the ConsumerFetcherThread
  (as
  previously noted, above), e.g.:
 
  2014-01-08 02:31:47,394  WARN
   [ConsumerFetcherThread-myconsumerapp-11]
  consumer.ConsumerFetcherThread -
  [ConsumerFetcherThread-myconsumerapp-11], Current offset
  16163904970
  for partition [mypartition,0] out of range; reset offset to
  16175326044
 
  More recently, I see these in the following log line (not sure
 why
  I
  didn't see it previously), coming from the ConsumerIterator:
 
  2014-01-08 02:31:47,681 ERROR [myconsumerthread-0]
  consumer.ConsumerIterator - consumed offset: 16163904970 doesn't
   match
  fetch offset: 16175326044 for mytopic:0: fetched offset =
   16175330598:
  consumed offset = 16163904970;
   Consumer may lose data
 
  Why would I not see this second ERROR everytime there's a
  corresponding WARN on the FetcherThread for an offset reset?
 
  Should I only be concerned about possible lost data if I see the
  second ERROR log line?
 
  Jason
 
  On Tue, Dec 24, 2013 at 3:49 PM, Jason Rosenberg 
 j...@squareup.com
  
 wrote:
   But I assume this would not be normally you'd want to log
 (every
   incoming producer request?).  Maybe just for debugging?  Or is
 it
only
   for consumer fetch requests?
  
   On Tue, Dec 24, 2013 at 12:50 PM, Guozhang Wang 
   wangg...@gmail.com
  wrote:
   TRACE is lower than INFO so INFO level request logging would
  also
   be
   recorded.
  
   You can check for Completed XXX request in the log files to
   check
 the
   request info with the correlation id.
  
   Guozhang
  
  
   On Mon, Dec 23, 2013 at 10:46 PM, Jason Rosenberg 
   j...@squareup.com

  wrote:
  
   Hmmm, it looks like I'm enabling all logging at INFO, and the
request
   logging is only done at TRACE (why is that?).
  
   I suppose one wouldn't normally want to see request logs, so
 by
  default,
   they aren't enabled

Re: understanding OffsetOutOfRangeException's....

2014-01-08 Thread Jason Rosenberg
Yes, it's happening continuously, at the moment (although I'm expecting the
consumer to catch up soon)

It seemed to start happening after I refactored the consumer app to use
multiple consumer connectors in the same process (each one has a separate
topic filter, so should be no overlap between them).  All using the same
consumer group.

Could it be a thread safety issue in the ZookeeperConsumerConnector (seems
unlikely).

Jason


On Wed, Jan 8, 2014 at 1:04 AM, Jun Rao jun...@gmail.com wrote:

 Normally, if the consumer can't keep up, you should just see the
 OffsetOutOfRangeException warning. The offset mismatch error should never
 happen. It could be that OffsetOutOfRangeException exposed a bug. Do you
 think you can reproduce this easily?

 Thanks,

 Jun


 On Tue, Jan 7, 2014 at 9:29 PM, Jason Rosenberg j...@squareup.com wrote:

  Jun,
 
  I'm not sure I understand your question, wrt produced data?
 
  But yes, in general, I believe the consumer is not keeping up with the
  broker's deleting the data.  So it's trying to fetch the next batch of
  data, but it's last offset is no longer there, etc.  So that's the reason
  for the WARN message, in the fetcher thread.
 
  I'm just not sure I understand then why we don't always see the
  ConsumerIterator error also, because won't there always be missing data
  detected there?  Why sometimes and not always?  What's the difference?
 
  Jason
 
 
  On Wed, Jan 8, 2014 at 12:07 AM, Jun Rao jun...@gmail.com wrote:
 
   The WARN and ERROR may not be completely correlated. Could it be that
 the
   consumer is slow and couldn't keep up with the produced data?
  
   Thanks,
  
   Jun
  
  
   On Tue, Jan 7, 2014 at 6:47 PM, Jason Rosenberg j...@squareup.com
  wrote:
  
So, sometimes I just get the WARN from the ConsumerFetcherThread (as
previously noted, above), e.g.:
   
2014-01-08 02:31:47,394  WARN
 [ConsumerFetcherThread-myconsumerapp-11]
consumer.ConsumerFetcherThread -
[ConsumerFetcherThread-myconsumerapp-11], Current offset 16163904970
for partition [mypartition,0] out of range; reset offset to
16175326044
   
More recently, I see these in the following log line (not sure why I
didn't see it previously), coming from the ConsumerIterator:
   
2014-01-08 02:31:47,681 ERROR [myconsumerthread-0]
consumer.ConsumerIterator - consumed offset: 16163904970 doesn't
 match
fetch offset: 16175326044 for mytopic:0: fetched offset =
 16175330598:
consumed offset = 16163904970;
 Consumer may lose data
   
Why would I not see this second ERROR everytime there's a
corresponding WARN on the FetcherThread for an offset reset?
   
Should I only be concerned about possible lost data if I see the
second ERROR log line?
   
Jason
   
On Tue, Dec 24, 2013 at 3:49 PM, Jason Rosenberg j...@squareup.com
   wrote:
 But I assume this would not be normally you'd want to log (every
 incoming producer request?).  Maybe just for debugging?  Or is it
  only
 for consumer fetch requests?

 On Tue, Dec 24, 2013 at 12:50 PM, Guozhang Wang 
 wangg...@gmail.com
wrote:
 TRACE is lower than INFO so INFO level request logging would also
 be
 recorded.

 You can check for Completed XXX request in the log files to
 check
   the
 request info with the correlation id.

 Guozhang


 On Mon, Dec 23, 2013 at 10:46 PM, Jason Rosenberg 
 j...@squareup.com
  
wrote:

 Hmmm, it looks like I'm enabling all logging at INFO, and the
  request
 logging is only done at TRACE (why is that?).

 I suppose one wouldn't normally want to see request logs, so by
default,
 they aren't enabled?


 On Mon, Dec 23, 2013 at 11:46 PM, Jun Rao jun...@gmail.com
  wrote:

  Did you enable request log? It logs the ip of every request.
 
  Thanks,
 
  Jun
 
 
  On Mon, Dec 23, 2013 at 3:52 PM, Jason Rosenberg 
  j...@squareup.com
   
 wrote:
 
   Hi Guozhang,
  
   I'm not sure I understand your first answer.  I don't see
   anything
   regarding the correlation id, elsewhere in the broker
   logs.They
 only
   show up in those ERROR messages
  
   I do see correlation id's in clients, but not on the
  broker.
  
   Jason
  
  
   On Mon, Dec 23, 2013 at 6:46 PM, Guozhang Wang 
   wangg...@gmail.com

  wrote:
  
Jason,
   
You can search the correlation id in the public access log
 on
   the
  servers
to get the consumer information.
   
As for logging, I agree that we should use the same level
 on
   both
  sides.
Could you file a jira for this?
   
Guozhang
   
   
On Mon, Dec 23, 2013 at 3:09 PM, Jason Rosenberg 
j...@squareup.com
   wrote:
   
 In our broker logs, we occasionally see errors like

Re: understanding OffsetOutOfRangeException's....

2014-01-08 Thread Jason Rosenberg
Joe,

I'm creating separate connectors, and creating separate streams, with a
separate thread pool to process them, for each connector.

This appears to be working well (e.g. each connector seems to be correctly
processing data).

The only difference is the extra ERROR log message I'm seeing on the
ConsumerIterator

Jazson


On Wed, Jan 8, 2014 at 1:59 PM, Joe Stein joe.st...@stealth.ly wrote:

 thinking/typing out loud here not sure if this is the problem but could be
 so figure I throw it out there

 the ZookeeperConsumerConnector has a messageStreamCreated atomic boolean
 stopping more than one consumer connector being created for messages stream
 by filter at once...

 do you have separate instantiated objects of your ConsumerConnector (so
 calling ConsumerConnector.create for each one) or do you have one
 ConsumerConnector.create and then you call createMessageStreams multiple
 times on it (which would cause a RuntimeException so probably not but maybe
 your code is swallowing that)... if the latter that could explain why it is
 lagging behind unexpectedly since really one ConsumerConnector is
 running

 again, you might not be doing this but figure I throw it out there in case
 you were.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /


 On Wed, Jan 8, 2014 at 1:44 PM, Jason Rosenberg j...@squareup.com wrote:

  Yes, it's happening continuously, at the moment (although I'm expecting
 the
  consumer to catch up soon)
 
  It seemed to start happening after I refactored the consumer app to use
  multiple consumer connectors in the same process (each one has a separate
  topic filter, so should be no overlap between them).  All using the same
  consumer group.
 
  Could it be a thread safety issue in the ZookeeperConsumerConnector
 (seems
  unlikely).
 
  Jason
 
 
  On Wed, Jan 8, 2014 at 1:04 AM, Jun Rao jun...@gmail.com wrote:
 
   Normally, if the consumer can't keep up, you should just see the
   OffsetOutOfRangeException warning. The offset mismatch error should
 never
   happen. It could be that OffsetOutOfRangeException exposed a bug. Do
 you
   think you can reproduce this easily?
  
   Thanks,
  
   Jun
  
  
   On Tue, Jan 7, 2014 at 9:29 PM, Jason Rosenberg j...@squareup.com
  wrote:
  
Jun,
   
I'm not sure I understand your question, wrt produced data?
   
But yes, in general, I believe the consumer is not keeping up with
 the
broker's deleting the data.  So it's trying to fetch the next batch
 of
data, but it's last offset is no longer there, etc.  So that's the
  reason
for the WARN message, in the fetcher thread.
   
I'm just not sure I understand then why we don't always see the
ConsumerIterator error also, because won't there always be missing
 data
detected there?  Why sometimes and not always?  What's the
 difference?
   
Jason
   
   
On Wed, Jan 8, 2014 at 12:07 AM, Jun Rao jun...@gmail.com wrote:
   
 The WARN and ERROR may not be completely correlated. Could it be
 that
   the
 consumer is slow and couldn't keep up with the produced data?

 Thanks,

 Jun


 On Tue, Jan 7, 2014 at 6:47 PM, Jason Rosenberg j...@squareup.com
wrote:

  So, sometimes I just get the WARN from the ConsumerFetcherThread
  (as
  previously noted, above), e.g.:
 
  2014-01-08 02:31:47,394  WARN
   [ConsumerFetcherThread-myconsumerapp-11]
  consumer.ConsumerFetcherThread -
  [ConsumerFetcherThread-myconsumerapp-11], Current offset
  16163904970
  for partition [mypartition,0] out of range; reset offset to
  16175326044
 
  More recently, I see these in the following log line (not sure
 why
  I
  didn't see it previously), coming from the ConsumerIterator:
 
  2014-01-08 02:31:47,681 ERROR [myconsumerthread-0]
  consumer.ConsumerIterator - consumed offset: 16163904970 doesn't
   match
  fetch offset: 16175326044 for mytopic:0: fetched offset =
   16175330598:
  consumed offset = 16163904970;
   Consumer may lose data
 
  Why would I not see this second ERROR everytime there's a
  corresponding WARN on the FetcherThread for an offset reset?
 
  Should I only be concerned about possible lost data if I see the
  second ERROR log line?
 
  Jason
 
  On Tue, Dec 24, 2013 at 3:49 PM, Jason Rosenberg 
 j...@squareup.com
  
 wrote:
   But I assume this would not be normally you'd want to log
 (every
   incoming producer request?).  Maybe just for debugging?  Or is
 it
only
   for consumer fetch requests?
  
   On Tue, Dec 24, 2013 at 12:50 PM, Guozhang Wang 
   wangg...@gmail.com
  wrote:
   TRACE is lower than INFO so INFO level request logging would
  also

Re: understanding OffsetOutOfRangeException's....

2014-01-07 Thread Jason Rosenberg
So, sometimes I just get the WARN from the ConsumerFetcherThread (as
previously noted, above), e.g.:

2014-01-08 02:31:47,394  WARN [ConsumerFetcherThread-myconsumerapp-11]
consumer.ConsumerFetcherThread -
[ConsumerFetcherThread-myconsumerapp-11], Current offset 16163904970
for partition [mypartition,0] out of range; reset offset to
16175326044

More recently, I see these in the following log line (not sure why I
didn't see it previously), coming from the ConsumerIterator:

2014-01-08 02:31:47,681 ERROR [myconsumerthread-0]
consumer.ConsumerIterator - consumed offset: 16163904970 doesn't match
fetch offset: 16175326044 for mytopic:0: fetched offset = 16175330598:
consumed offset = 16163904970;
 Consumer may lose data

Why would I not see this second ERROR everytime there's a
corresponding WARN on the FetcherThread for an offset reset?

Should I only be concerned about possible lost data if I see the
second ERROR log line?

Jason

On Tue, Dec 24, 2013 at 3:49 PM, Jason Rosenberg j...@squareup.com wrote:
 But I assume this would not be normally you'd want to log (every
 incoming producer request?).  Maybe just for debugging?  Or is it only
 for consumer fetch requests?

 On Tue, Dec 24, 2013 at 12:50 PM, Guozhang Wang wangg...@gmail.com wrote:
 TRACE is lower than INFO so INFO level request logging would also be
 recorded.

 You can check for Completed XXX request in the log files to check the
 request info with the correlation id.

 Guozhang


 On Mon, Dec 23, 2013 at 10:46 PM, Jason Rosenberg j...@squareup.com wrote:

 Hmmm, it looks like I'm enabling all logging at INFO, and the request
 logging is only done at TRACE (why is that?).

 I suppose one wouldn't normally want to see request logs, so by default,
 they aren't enabled?


 On Mon, Dec 23, 2013 at 11:46 PM, Jun Rao jun...@gmail.com wrote:

  Did you enable request log? It logs the ip of every request.
 
  Thanks,
 
  Jun
 
 
  On Mon, Dec 23, 2013 at 3:52 PM, Jason Rosenberg j...@squareup.com
 wrote:
 
   Hi Guozhang,
  
   I'm not sure I understand your first answer.  I don't see anything
   regarding the correlation id, elsewhere in the broker logs.They
 only
   show up in those ERROR messages
  
   I do see correlation id's in clients, but not on the broker.
  
   Jason
  
  
   On Mon, Dec 23, 2013 at 6:46 PM, Guozhang Wang wangg...@gmail.com
  wrote:
  
Jason,
   
You can search the correlation id in the public access log on the
  servers
to get the consumer information.
   
As for logging, I agree that we should use the same level on both
  sides.
Could you file a jira for this?
   
Guozhang
   
   
On Mon, Dec 23, 2013 at 3:09 PM, Jason Rosenberg j...@squareup.com
   wrote:
   
 In our broker logs, we occasionally see errors like this:

 2013-12-23 05:02:08,456 ERROR [kafka-request-handler-2]
   server.KafkaApis
-
 [KafkaApi-45] Error when processing fetch request for partition
[mytopic,0]
 offset 204243601 from consumer with correlation id 130341
 kafka.common.OffsetOutOfRangeException: Request for offset
 204243601
   but
we
 only have log segments in the range 204343397 to 207423640.

 I assume this means there's a consumer that has fallen behind
  consuming
 messages, and the log retention policy has removed messages before
  they
 could be consumed by the consumer.

 However, I'm not 100% which consumer it is, and it looks like the
  only
info
 we have is the correlation id of the consumer, e.g.:

 from consumer with correlation id 130341

 Is there a way to know which consumer this refers to?  It seems
 there
   are
 far more correlation id's than there are consumers.  Would it be
   possible
 to provide a bit more descriptive error message here, so we can
immediately
 know which consumer is falling behind?

 We do see a corresponding entry in the consumer logs too:

 2013-12-23 05:02:08,797  WARN
 [ConsumerFetcherThread-myconsumergroup-1387353494862-7aa0c61d-0-45]
 consumer.ConsumerFetcherThread -

 [ConsumerFetcherThread-myconsumergroup-1387353494862-7aa0c61d-0-45],
 Current offset 204243601 for partition [mytopic,0] out of range;
  reset
 offset to 204343397

 But it would be nice to be able to also use the broker log to
 quickly
find
 consumers with issues.

 Also, I'm not sure, why is logging the event as an ERROR in the
  broker,
but
 a WARN in the consumer?

 Jason

   
   
   
--
-- Guozhang
   
  
 




 --
 -- Guozhang


Re: understanding OffsetOutOfRangeException's....

2014-01-07 Thread Jason Rosenberg
Jun,

I'm not sure I understand your question, wrt produced data?

But yes, in general, I believe the consumer is not keeping up with the
broker's deleting the data.  So it's trying to fetch the next batch of
data, but it's last offset is no longer there, etc.  So that's the reason
for the WARN message, in the fetcher thread.

I'm just not sure I understand then why we don't always see the
ConsumerIterator error also, because won't there always be missing data
detected there?  Why sometimes and not always?  What's the difference?

Jason


On Wed, Jan 8, 2014 at 12:07 AM, Jun Rao jun...@gmail.com wrote:

 The WARN and ERROR may not be completely correlated. Could it be that the
 consumer is slow and couldn't keep up with the produced data?

 Thanks,

 Jun


 On Tue, Jan 7, 2014 at 6:47 PM, Jason Rosenberg j...@squareup.com wrote:

  So, sometimes I just get the WARN from the ConsumerFetcherThread (as
  previously noted, above), e.g.:
 
  2014-01-08 02:31:47,394  WARN [ConsumerFetcherThread-myconsumerapp-11]
  consumer.ConsumerFetcherThread -
  [ConsumerFetcherThread-myconsumerapp-11], Current offset 16163904970
  for partition [mypartition,0] out of range; reset offset to
  16175326044
 
  More recently, I see these in the following log line (not sure why I
  didn't see it previously), coming from the ConsumerIterator:
 
  2014-01-08 02:31:47,681 ERROR [myconsumerthread-0]
  consumer.ConsumerIterator - consumed offset: 16163904970 doesn't match
  fetch offset: 16175326044 for mytopic:0: fetched offset = 16175330598:
  consumed offset = 16163904970;
   Consumer may lose data
 
  Why would I not see this second ERROR everytime there's a
  corresponding WARN on the FetcherThread for an offset reset?
 
  Should I only be concerned about possible lost data if I see the
  second ERROR log line?
 
  Jason
 
  On Tue, Dec 24, 2013 at 3:49 PM, Jason Rosenberg j...@squareup.com
 wrote:
   But I assume this would not be normally you'd want to log (every
   incoming producer request?).  Maybe just for debugging?  Or is it only
   for consumer fetch requests?
  
   On Tue, Dec 24, 2013 at 12:50 PM, Guozhang Wang wangg...@gmail.com
  wrote:
   TRACE is lower than INFO so INFO level request logging would also be
   recorded.
  
   You can check for Completed XXX request in the log files to check
 the
   request info with the correlation id.
  
   Guozhang
  
  
   On Mon, Dec 23, 2013 at 10:46 PM, Jason Rosenberg j...@squareup.com
  wrote:
  
   Hmmm, it looks like I'm enabling all logging at INFO, and the request
   logging is only done at TRACE (why is that?).
  
   I suppose one wouldn't normally want to see request logs, so by
  default,
   they aren't enabled?
  
  
   On Mon, Dec 23, 2013 at 11:46 PM, Jun Rao jun...@gmail.com wrote:
  
Did you enable request log? It logs the ip of every request.
   
Thanks,
   
Jun
   
   
On Mon, Dec 23, 2013 at 3:52 PM, Jason Rosenberg j...@squareup.com
 
   wrote:
   
 Hi Guozhang,

 I'm not sure I understand your first answer.  I don't see
 anything
 regarding the correlation id, elsewhere in the broker
 logs.They
   only
 show up in those ERROR messages

 I do see correlation id's in clients, but not on the broker.

 Jason


 On Mon, Dec 23, 2013 at 6:46 PM, Guozhang Wang 
 wangg...@gmail.com
  
wrote:

  Jason,
 
  You can search the correlation id in the public access log on
 the
servers
  to get the consumer information.
 
  As for logging, I agree that we should use the same level on
 both
sides.
  Could you file a jira for this?
 
  Guozhang
 
 
  On Mon, Dec 23, 2013 at 3:09 PM, Jason Rosenberg 
  j...@squareup.com
 wrote:
 
   In our broker logs, we occasionally see errors like this:
  
   2013-12-23 05:02:08,456 ERROR [kafka-request-handler-2]
 server.KafkaApis
  -
   [KafkaApi-45] Error when processing fetch request for
 partition
  [mytopic,0]
   offset 204243601 from consumer with correlation id 130341
   kafka.common.OffsetOutOfRangeException: Request for offset
   204243601
 but
  we
   only have log segments in the range 204343397 to 207423640.
  
   I assume this means there's a consumer that has fallen behind
consuming
   messages, and the log retention policy has removed messages
  before
they
   could be consumed by the consumer.
  
   However, I'm not 100% which consumer it is, and it looks like
  the
only
  info
   we have is the correlation id of the consumer, e.g.:
  
   from consumer with correlation id 130341
  
   Is there a way to know which consumer this refers to?  It
 seems
   there
 are
   far more correlation id's than there are consumers.  Would it
  be
 possible
   to provide a bit more descriptive error message here, so we
 can
  immediately
   know which consumer

Re: problem with high-level consumer stream filter regex....

2014-01-03 Thread Jason Rosenberg
Thanks Joe,

I can confirm that your patch works for me, as applied to 0.8.0.

Jason

On Fri, Dec 20, 2013 at 6:28 PM, Jason Rosenberg j...@squareup.com wrote:
 Thanks Joe,

 I generally build locally, and upload to our maven proxy (using a custom
 pom).

 I haven't yet had luck using maven central (although, I might upgrade to the
 2.10 version, in which case I understand it to be in better shape?).

 I containerize the broker (and all the producers and consumers), so I use
 the kafka jar directly.

 I think if you do the patch against 0.8, I can apply and use.  Ultimately,
 I'll upgrade to 0.8.1, once that's in a beta release state.

 Thanks again,

 Jason




 On Fri, Dec 20, 2013 at 10:29 AM, Joe Stein joe.st...@stealth.ly wrote:

 Hey Jason, I was able to reproduce the issue and have a fix in hand to
 test
 later today.  If it looks good I will post the patch. I am going to-do the
 patch against 0.8 branch first.  How do you deploy and use libraries? Is
 it
 download broker and use maven central?

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /


 On Wed, Dec 18, 2013 at 4:13 PM, Jason Rosenberg j...@squareup.com wrote:

  thanks Joe!
 
 
  On Wed, Dec 18, 2013 at 11:05 AM, Joe Stein joe.st...@stealth.ly
  wrote:
 
   Hey Jason, I have someone looking into it now (they just started).
  
   I can look at it on Friday or if I finish up what I am working on for
   tomorrow then sooner.
  
   /***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
   /
  
  
   On Wed, Dec 18, 2013 at 8:15 AM, Jason Rosenberg j...@squareup.com
  wrote:
  
Joe,
   
I think the java code I listed in the Jira ticket should reproduce
the
issue directly, does that not work?
   
Jason
   
   
On Tue, Dec 17, 2013 at 9:49 AM, Joe Stein joe.st...@stealth.ly
  wrote:
   
 Hi Jason, I just replied on the ticket.  If it is a bug the update
 to
 create new filter or fix as bug, same.

 Can you post some code to help reproduce the problem?  so apples
 to
apples
 and such, thanks!

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop
 http://www.twitter.com/allthingshadoop
 /


 On Tue, Dec 17, 2013 at 1:16 AM, Jason Rosenberg
 j...@squareup.com
wrote:

  Ping
 
  Any thoughts on this?
 
  Seems like a bug, but then again, we're not sure what the
  expected
 behavior
  for regexes should be here (e.g. is there a way to whitelist
  topics
with
 a
  filter that looks for a leading substring, but then blocks
  subsequent
  substrings)?  E.g. apply a blacklist to a whitelist :).
 
  Jason
 
 
  On Thu, Dec 12, 2013 at 1:01 PM, Jason Rosenberg
  j...@squareup.com
  
 wrote:
 
   All, I've filed:
  https://issues.apache.org/jira/browse/KAFKA-1180
  
   We are needing to create a stream selector that essentially
   combines
 the
   logic of the BlackList and WhiteList classes.  That is, we
   want
  to
  select a
   topic that contains a certain prefix, as long as it doesn't
   also
 contain
  a
   secondary string.
  
   This should be easy to do with ordinary java Regex's, but
   we're
running
   into some issues, trying to do this with the WhiteList class
  only.
  
   We have a pattern that uses negative lookahead, like this:
  
   test-(?!bad\\b)[\\w]+
  
   So this should select a topic like: test-good, but exclude a
   topic
 like
   test-bad, and also exclude a topic without the test
   prefix,
   like
   foo-bar.
  
   Instead, what we see is a NullPointerException in the
ConsumerIterator,
   and the consumer just hangs, after sending a topic of
  'test-topic'
  followed
   by 'test-bad':
  
   21700
  
 

   
  
 
  [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683]
   ERROR kafka.consumer.ConsumerFetcherThread  -
  
 

   
  
 
  [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683],
   Error due to
   kafka.common.KafkaException: error processing data for
   partition
   [test-bad,0] offset 0
   at
  
 

   
  
 
  kafka.server.AbstractFetcherThread$$anonfun

Re: which zookeeper version

2014-01-02 Thread Jason Rosenberg
Hi Pushkar,

We've been using zk 3.4.5 for several months now, without any
problems, in production.

Jason

On Thu, Jan 2, 2014 at 1:15 AM, pushkar priyadarshi
priyadarshi.push...@gmail.com wrote:
 Hi,

 I am starting a fresh deployment of kafka + zookeeper.Looking at zookeeper
 releases find 3.4.5 old and stable enough.Has anyone used this before in
 production?
 kafka ops wiki page says at Linkedin deployment still uses 3.3.4.Any
 specific reason for the same.

 Thanks And Regards,
 Pushkar


Re: Understanding the min fetch rate metric

2013-12-26 Thread Jason Rosenberg
And, I assume the 'minFetchRate' refers to the lowest fetch rate over
all the fetchers (1 per broker) that a consumer has. I see

On Tue, Dec 24, 2013 at 8:58 PM, Jun Rao jun...@gmail.com wrote:
 There is one fetcher per broker, which is responsible for fetching messages
 in all consumed topics whose leader is on that broker. We have seen a
 fetcher being killed by a bug in Kafka. Also, if the broker is slow (e.g.
 due to I/O contention), the fetch rate could also be slower than expected.

 Thanks,

 Jun


 On Tue, Dec 24, 2013 at 12:48 PM, Jason Rosenberg j...@squareup.com wrote:

 Thanks Jun,

 I'm still not 100% clear on this.  Is the min-fetch rate per topic, or
 is it the lowest fetch rate over all topics?  Or is it not topic
 specific at all.

 What would cause this to slow down?  Or is it more a measure of the
 consumer's rate at processing messages and ability fetching new
 messages (regardless of topic)?

 If we have some topics with no data at all, that match the consumer's
 topic filter, should we expect to always see a min fetch rate of zero
 (since at least one topic really has no messages)?

 Jason

 On Tue, Dec 24, 2013 at 1:10 PM, Jun Rao jun...@gmail.com wrote:
  I updated http://kafka.apache.org/documentation.html#monitoring
 
  Thanks,
 
  Jun
 
 
  On Mon, Dec 23, 2013 at 10:51 PM, Jason Rosenberg j...@squareup.com
 wrote:
 
  I'm realizing I'm not quite sure what the 'min fetch rate' metrics is
  indicating, for consumers.  Can someone offer an explanation?
 
  Is it related to the 'max lag' metric?
 
  Jason
 



Re: Understanding the min fetch rate metric

2013-12-24 Thread Jason Rosenberg
Thanks Jun,

I'm still not 100% clear on this.  Is the min-fetch rate per topic, or
is it the lowest fetch rate over all topics?  Or is it not topic
specific at all.

What would cause this to slow down?  Or is it more a measure of the
consumer's rate at processing messages and ability fetching new
messages (regardless of topic)?

If we have some topics with no data at all, that match the consumer's
topic filter, should we expect to always see a min fetch rate of zero
(since at least one topic really has no messages)?

Jason

On Tue, Dec 24, 2013 at 1:10 PM, Jun Rao jun...@gmail.com wrote:
 I updated http://kafka.apache.org/documentation.html#monitoring

 Thanks,

 Jun


 On Mon, Dec 23, 2013 at 10:51 PM, Jason Rosenberg j...@squareup.com wrote:

 I'm realizing I'm not quite sure what the 'min fetch rate' metrics is
 indicating, for consumers.  Can someone offer an explanation?

 Is it related to the 'max lag' metric?

 Jason



  1   2   3   >