Re: KafkaProducer "send" blocks on first attempt with Kafka server offline

2016-03-30 Thread Steven Wu
Oleg, I believe 0.9 producer gave you the control "max.block.ms" now

On Wed, Mar 30, 2016 at 5:31 AM, Oleg Zhurakousky <
ozhurakou...@hortonworks.com> wrote:

> I'll buy both 'back pressure' and 'block' argument, but what does it have
> to do with the Future? Isn't that the main point of the Future - a
> reference to an invocation that may or may not occur some time in the
> future? Isn't that the purpose of the Future.get(..)  to give user a choice
> and chance to wait for completion, and that is where blocking and back
> pressure is handled.
>
> The scary part is that not only send() can block indefinitely, if used
> within users code it creates a deadlock in users code unless user does
> something to avoid that. And what some of us do is invoke send() as
> Callable returning yet another Future. Doesn't that seem awkward? Please
> let me know if there is another way to deal with infinite block.
>
> Furthermore, the problem is similar to the one in consumer API where
> Iterator was mapped to network service and hasNext() is also a blocking
> call, but at least unlike send() there is a workaround property for
> hasNext(). Could at least the same workaround be introduced to send()?
>
> Cheers
> Oleg
>
> > On Mar 29, 2016, at 23:12, Dana Powers <dana.pow...@gmail.com> wrote:
> >
> > Somewhat of an aside, but you might note that the kafka producer is
> > intended to block during send() as backpressure on memory allocation.
> > This is admittedly different than blocking on metadata, but it is worth
> > considering that the premise that send() should *never* block because
> > it returns a Future seems fundamentally at odds with the current design.
> >
> > In any event, there is a configuration that you can tweak to set the max
> > time the producer will spend blocking in send(): max.block.ms
> >
> > -Dana
> >
> >
> >> On Tue, Mar 29, 2016 at 7:26 PM, Steven Wu <stevenz...@gmail.com>
> wrote:
> >>
> >> I also agree that returning a Future should never block. I have brought
> >> this up when 0.8.2 was first released for new Java  producer.
> >>
> >> As Oleg said, KafkaProducer can also block if metadata is not fetched.
> This
> >> is probably more often than offline broker, because metadata is loaded
> >> lazily when there is a first send attempt for the topic. In another
> word,
> >> first send to a topic will always block until metadata is fetched or
> timed
> >> out.
> >>
> >> We had to handle this in our wrapper code. Basically, we check if
> metadata
> >> not available, we put msg into a queue and drain the queue from a diff
> >> thread.
> >>
> >> Thanks,
> >> Steven
> >>
> >>
> >> On Tue, Mar 29, 2016 at 4:59 AM, Oleg Zhurakousky <
> >> ozhurakou...@hortonworks.com> wrote:
> >>
> >>> I agree and considering that send(..) method returns Future one would
> >>> argue it must never block, otherwise what’s the point of returning
> Future
> >>> if you remove user’s ability to control how long are they willing to
> wait
> >>> and what to do when certain types of exception arise. Nevertheless it
> >> does
> >>> and it happens in the very first line of code:
> >>> // first make sure the metadata for the topic is available
> >>> waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
> >>> So I am curious as well as to what is the motivation for it as we’ve
> seen
> >>> the same symptoms multiple times.
> >>> Cheers
> >>> Oleg
> >>> On Mar 29, 2016, at 4:31 AM, Paolo Patierno <ppatie...@live.com
>  >>> ppatie...@live.com>> wrote:
> >>>
> >>> Hello,
> >>> as documentation says, the KafkaProducer.send() method is asynchronous
> >> and
> >>> it just returns immediately.I found out that it's not so true when the
> >>> Kafka server it's trying to connect isn't online.Of course it happens
> >> only
> >>> on the first send() method invocation. It means that if the Kafka
> server
> >>> isn't reachable when my application starts for the first time, the
> send()
> >>> method isn't so asynchronous but it blocks.I know that it's trying to
> >>> connect to the Kafka server but why it doesn't save the message into
> the
> >>> buffer and returns immediately ?Is it a behavior or a bug ?
> >>> Thanks,Paolo
> >>>
> >>> Paolo PatiernoSenior Software Engineer
> >>>
> >>>
> >>> Windows Embedded & IoTMicrosoft Azure Advisor
> >>> Twitter : @ppatierno
> >>> Linkedin : paolopatierno
> >>> Blog : DevExperienceBlog : Embedded101
> >>
>


Re: KafkaProducer "send" blocks on first attempt with Kafka server offline

2016-03-29 Thread Steven Wu
I also agree that returning a Future should never block. I have brought
this up when 0.8.2 was first released for new Java  producer.

As Oleg said, KafkaProducer can also block if metadata is not fetched. This
is probably more often than offline broker, because metadata is loaded
lazily when there is a first send attempt for the topic. In another word,
first send to a topic will always block until metadata is fetched or timed
out.

We had to handle this in our wrapper code. Basically, we check if metadata
not available, we put msg into a queue and drain the queue from a diff
thread.

Thanks,
Steven


On Tue, Mar 29, 2016 at 4:59 AM, Oleg Zhurakousky <
ozhurakou...@hortonworks.com> wrote:

> I agree and considering that send(..) method returns Future one would
> argue it must never block, otherwise what’s the point of returning Future
> if you remove user’s ability to control how long are they willing to wait
> and what to do when certain types of exception arise. Nevertheless it does
> and it happens in the very first line of code:
> // first make sure the metadata for the topic is available
> waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
> So I am curious as well as to what is the motivation for it as we’ve seen
> the same symptoms multiple times.
> Cheers
> Oleg
> On Mar 29, 2016, at 4:31 AM, Paolo Patierno > wrote:
>
> Hello,
> as documentation says, the KafkaProducer.send() method is asynchronous and
> it just returns immediately.I found out that it's not so true when the
> Kafka server it's trying to connect isn't online.Of course it happens only
> on the first send() method invocation. It means that if the Kafka server
> isn't reachable when my application starts for the first time, the send()
> method isn't so asynchronous but it blocks.I know that it's trying to
> connect to the Kafka server but why it doesn't save the message into the
> buffer and returns immediately ?Is it a behavior or a bug ?
> Thanks,Paolo
>
> Paolo PatiernoSenior Software Engineer
>
>
> Windows Embedded & IoTMicrosoft Azure Advisor
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperienceBlog : Embedded101
>
>
>


Re: some producers stuck when one broker is bad

2015-09-12 Thread Steven Wu
I was doing a rolling bounce of all brokers. Immediately after the bad
broker was bounced, those stuck producers recovered

On Fri, Sep 11, 2015 at 9:05 AM, Mayuresh Gharat <gharatmayures...@gmail.com
> wrote:

> So how did you detect that the broker is bad? If bouncing brokers solved
> the problem and you did not find any unusual things in the logs on brokers
> , it is likely that the process was up but was isolated from producer
> request and since the producer did not have timeout the producer buffer
> filled up.
>
> Thanks,
>
> Mayuresh
>
>
> On Thu, Sep 10, 2015 at 11:20 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
> > frankly I don't know exactly what went BAD for that broker. process is
> > still UP.
> >
> > On Wed, Sep 9, 2015 at 10:10 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com
> > > wrote:
> >
> > > 1) any suggestion on how to identify the bad broker(s)?
> > > ---> At Linkedin we have alerts that are setup using our internal
> scripts
> > > for detecting if a broker has gone bad. We also check the under
> > replicated
> > > partitions and that can tell us which broker has gone bad. By broker
> > going
> > > bad, it can mean different things. Like the broker is alive but not
> > > responding and is completely isolated or the broker has gone down, etc.
> > > Can you tell us what you meant by your BROKER went BAD?
> > >
> > > 2) why bouncing of the bad broker got the producers recovered
> > automatically
> > > > This is because as you bounced, the leaders for other partitions
> > > changed and producer sent out a TopicMetadataRequest which tells the
> > > producer who are the new leaders for the partitions and the producer
> > > started sending messages to those brokers.
> > >
> > > KAFKA-2120 will handle all of this for you automatically.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Sep 8, 2015 at 8:26 PM, Steven Wu <stevenz...@gmail.com>
> wrote:
> > >
> > > > We have observed that some producer instances stopped sending traffic
> > to
> > > > brokers, because the memory buffer is full. those producers got stuck
> > in
> > > > this state permanently. Because we couldn't find out which broker is
> > bad
> > > > here. So I did a rolling restart the all brokers. after the bad
> broker
> > > got
> > > > bounce, those stuck producers out of the woods automatically.
> > > >
> > > > I don't know the exact problem with that bad broker. it seems to me
> > that
> > > > some ZK states are inconsistent.
> > > >
> > > > I know timeout fix from KAFKA-2120 can probably avoid the permanent
> > > stuck.
> > > > Here are some additional questions.
> > > > 1) any suggestion on how to identify the bad broker(s)?
> > > > 2) why bouncing of the bad broker got the producers recovered
> > > automatically
> > > > (without restarting producers)
> > > >
> > > > producer: 0.8.2.1
> > > > broker: 0.8.2.1
> > > >
> > > > Thanks,
> > > > Steven
> > > >
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


Re: some producers stuck when one broker is bad

2015-09-11 Thread Steven Wu
frankly I don't know exactly what went BAD for that broker. process is
still UP.

On Wed, Sep 9, 2015 at 10:10 AM, Mayuresh Gharat <gharatmayures...@gmail.com
> wrote:

> 1) any suggestion on how to identify the bad broker(s)?
> ---> At Linkedin we have alerts that are setup using our internal scripts
> for detecting if a broker has gone bad. We also check the under replicated
> partitions and that can tell us which broker has gone bad. By broker going
> bad, it can mean different things. Like the broker is alive but not
> responding and is completely isolated or the broker has gone down, etc.
> Can you tell us what you meant by your BROKER went BAD?
>
> 2) why bouncing of the bad broker got the producers recovered automatically
> > This is because as you bounced, the leaders for other partitions
> changed and producer sent out a TopicMetadataRequest which tells the
> producer who are the new leaders for the partitions and the producer
> started sending messages to those brokers.
>
> KAFKA-2120 will handle all of this for you automatically.
>
> Thanks,
>
> Mayuresh
>
> On Tue, Sep 8, 2015 at 8:26 PM, Steven Wu <stevenz...@gmail.com> wrote:
>
> > We have observed that some producer instances stopped sending traffic to
> > brokers, because the memory buffer is full. those producers got stuck in
> > this state permanently. Because we couldn't find out which broker is bad
> > here. So I did a rolling restart the all brokers. after the bad broker
> got
> > bounce, those stuck producers out of the woods automatically.
> >
> > I don't know the exact problem with that bad broker. it seems to me that
> > some ZK states are inconsistent.
> >
> > I know timeout fix from KAFKA-2120 can probably avoid the permanent
> stuck.
> > Here are some additional questions.
> > 1) any suggestion on how to identify the bad broker(s)?
> > 2) why bouncing of the bad broker got the producers recovered
> automatically
> > (without restarting producers)
> >
> > producer: 0.8.2.1
> > broker: 0.8.2.1
> >
> > Thanks,
> > Steven
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


some producers stuck when one broker is bad

2015-09-08 Thread Steven Wu
We have observed that some producer instances stopped sending traffic to
brokers, because the memory buffer is full. those producers got stuck in
this state permanently. Because we couldn't find out which broker is bad
here. So I did a rolling restart the all brokers. after the bad broker got
bounce, those stuck producers out of the woods automatically.

I don't know the exact problem with that bad broker. it seems to me that
some ZK states are inconsistent.

I know timeout fix from KAFKA-2120 can probably avoid the permanent stuck.
Here are some additional questions.
1) any suggestion on how to identify the bad broker(s)?
2) why bouncing of the bad broker got the producers recovered automatically
(without restarting producers)

producer: 0.8.2.1
broker: 0.8.2.1

Thanks,
Steven


producer metadata behavior when topic not created

2015-06-09 Thread Steven Wu
Hi,

I am talking about the 0.8.2 java producer.

In our deployment, we disables auto topic creation, because we would like
to control the precise number of partitions created for each topic and the
placement of partitions (e.g. zone-aware).

I did some experimentation and checked the code. metadata request to broker
(for non-exist topic) will got a successful response. should broker return
failure or partial failure if queried topic doesn't exist? can we add
metric at broker side for querying non-exist topics?

The net behavior is that there are more metadata queries from producer,
throttled by the backoff config (default is 100ms). can we add a metric for
metadata request and response rate? rate should normally be very low during
steady state, as default refresh interval is 5 mins.

basically, I am trying to detect this scenario (non-exist topic) and be
able to alert on some metrics. any other suggestions?

Thanks,
Steven


Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-03 Thread Steven Wu
1369725251. It's a fix in the underline virtualization platform. Here is
the quote from the ticket.

The issue is a bug in a performance improvement (10% improved PPS when
using Xen PV netback/netfront networking) in the latest build of the
virtualization platform, which has only been released to D2 instances. The
issue is triggered by a race condition deadlock in kernel code that your
workload appears to trigger 5-10% of the time.

On Tue, Jun 2, 2015 at 4:26 PM, Henry Cai h...@pinterest.com.invalid
wrote:

 Steven,

 Do you have the AWS case # (or the Ubuntu bug/case #) when you hit that
 kernel panic issue?

 Our company will still be running on AMI image 12.04 for a while, I will
 see whether the fix was also ported onto Ubuntu 12.04

 On Tue, Jun 2, 2015 at 2:53 PM, Steven Wu stevenz...@gmail.com wrote:

  now I remember we had same kernel panic issue in the first week of D2
  rolling-out. then AWS fixed it and we haven't seen any issue since. try
  Ubuntu 14.04 and see if it resolves your remaining kernel/instability
 issue.
 
  On Tue, Jun 2, 2015 at 2:30 PM, Wes Chow w...@chartbeat.com wrote:
 
 
Daniel Nelson daniel.nel...@vungle.com
   June 2, 2015 at 4:39 PM
 
  On Jun 2, 2015, at 1:22 PM, Steven Wu stevenz...@gmail.com 
 stevenz...@gmail.com wrote:
 
  can you elaborate what kind of instability you have encountered?
 
  We have seen the nodes become completely non-responsive. Usually they
 get rebooted automatically after 10-20 minutes, but occasionally they get
 stuck for days in a state where they cannot be rebooted via the Amazon APIs.
 
 
  Same here. It was worse right after d2 launch. We had 6 out of 9 servers
  die within 10 hours after spinning them up. Amazon rolled out a fix, but
  we're still seeing similar issues, though not nearly as bad. The first
 fix
  was for something network related, and apparently sending lots of data
  through the instances caused a kernel panic on the host. We have no
  information yet about the current issue.
 
  Wes
 
Steven Wu stevenz...@gmail.com
   June 2, 2015 at 4:22 PM
  Wes/Daniel,
 
  can you elaborate what kind of instability you have encountered?
 
  we are on Ubuntu 14.04.2 and haven't encountered any issues so far. in
  the announcement, they did mention using Ubuntu 14.04 for better disk
  throughput. not sure whether 14.04 also addresses any instability issue
 you
  encountered or not.
 
  Thanks,
  Steven
 
  In order to ensure the best disk throughput performance from your D2
 instances
  on Linux, we recommend that you use the most recent version of the
 Amazon
  Linux AMI, or another Linux AMI with a kernel version of 3.8 or later.
 The
  D2 instances provide the best disk performance when you use a Linux
  kernel that supports Persistent Grants – an extension to the Xen block
 ring
  protocol that significantly improves disk throughput and scalability.
 The
  following Linux AMIs support this feature:
 
 - Amazon Linux AMI 2015.03 (HVM)
 - Ubuntu Server 14.04 LTS (HVM)
 - Red Hat Enterprise Linux 7.1 (HVM)
 - SUSE Linux Enterprise Server 12 (HVM)
 
 
 
 
Daniel Nelson daniel.nel...@vungle.com
   June 2, 2015 at 2:42 PM
 
  Do you have any workarounds for the d2 issues? We’ve been using them for
  our Kafkas too, and ran into the instability. We’re on Ubuntu 12.04 and
  plan to try on 14.04 with the latest HWE to see if that helps any.
 
  Thanks!
Wes Chow w...@chartbeat.com
   June 2, 2015 at 1:39 PM
 
  We have run d2 instances with Kafka. They're currently unstable --
 Amazon
  confirmed a host issue with d2 instances that gets tickled by a Kafka
  workload yesterday. Otherwise, it seems the d2 instance type is ideal
 as it
  gets an enormous amount of disk throughput and you'll likely be network
  bottlenecked.
 
  Wes
 
 
Steven Wu stevenz...@gmail.com
   June 2, 2015 at 1:07 PM
  EBS (network attached storage) has got a lot better over the last a few
  years. we don't quite trust it for kafka workload.
 
  At Netflix, we were going with the new d2 instance type (HDD). our
  perf/load testing shows it satisfy our workload. SSD is better in
 latency
  curve but pretty comparable in terms of throughput. we can use the extra
  space from HDD for longer retention period.
 
  On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid
  h...@pinterest.com.invalid
 
 
 



Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-02 Thread Steven Wu
Wes/Daniel,

can you elaborate what kind of instability you have encountered?

we are on Ubuntu 14.04.2 and haven't encountered any issues so far. in the
announcement, they did mention using Ubuntu 14.04 for better disk
throughput. not sure whether 14.04 also addresses any instability issue you
encountered or not.

Thanks,
Steven

In order to ensure the best disk throughput performance from your D2 instances
on Linux, we recommend that you use the most recent version of the Amazon
Linux AMI, or another Linux AMI with a kernel version of 3.8 or later. The
D2 instances provide the best disk performance when you use a Linux kernel
that supports Persistent Grants – an extension to the Xen block ring
protocol that significantly improves disk throughput and scalability. The
following Linux AMIs support this feature:

   - Amazon Linux AMI 2015.03 (HVM)
   - Ubuntu Server 14.04 LTS (HVM)
   - Red Hat Enterprise Linux 7.1 (HVM)
   - SUSE Linux Enterprise Server 12 (HVM)



On Tue, Jun 2, 2015 at 12:31 PM, Wes Chow w...@chartbeat.com wrote:


 Our workaround is to switch to i2's. Amazon didn't mention anything,
 though we're getting on a call with them soon so I'll be sure to ask. Fwiw,
 we're also on 12.04.

 Wes


   Daniel Nelson daniel.nel...@vungle.com
  June 2, 2015 at 2:42 PM

 Do you have any workarounds for the d2 issues? We’ve been using them for
 our Kafkas too, and ran into the instability. We’re on Ubuntu 12.04 and
 plan to try on 14.04 with the latest HWE to see if that helps any.

 Thanks!
   Wes Chow w...@chartbeat.com
  June 2, 2015 at 1:39 PM

 We have run d2 instances with Kafka. They're currently unstable -- Amazon
 confirmed a host issue with d2 instances that gets tickled by a Kafka
 workload yesterday. Otherwise, it seems the d2 instance type is ideal as it
 gets an enormous amount of disk throughput and you'll likely be network
 bottlenecked.

 Wes


   Henry Cai h...@pinterest.com.INVALID
  June 2, 2015 at 12:37 PM
 We have been hosting kafka brokers in Amazon EC2 and we are using EBS
 disk. But periodically we were hit by long I/O wait time on EBS in some
 Availability Zones.

 We are thinking to change the instance types to a local HDD or local SSD.
 HDD is cheaper and bigger and seems quite fit for the Kafka use case which
 is mostly sequential read/write, but some early experiments show the HDD
 cannot catch up with the message producing speed since there are many
 topic/partitions on the broker which actually makes the disk I/O more
 randomly accessed.

 How are people's experience of choosing disk types on Amazon?




Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-02 Thread Steven Wu
now I remember we had same kernel panic issue in the first week of D2
rolling-out. then AWS fixed it and we haven't seen any issue since. try
Ubuntu 14.04 and see if it resolves your remaining kernel/instability issue.

On Tue, Jun 2, 2015 at 2:30 PM, Wes Chow w...@chartbeat.com wrote:


   Daniel Nelson daniel.nel...@vungle.com
  June 2, 2015 at 4:39 PM

 On Jun 2, 2015, at 1:22 PM, Steven Wu stevenz...@gmail.com 
 stevenz...@gmail.com wrote:

 can you elaborate what kind of instability you have encountered?

 We have seen the nodes become completely non-responsive. Usually they get 
 rebooted automatically after 10-20 minutes, but occasionally they get stuck 
 for days in a state where they cannot be rebooted via the Amazon APIs.


 Same here. It was worse right after d2 launch. We had 6 out of 9 servers
 die within 10 hours after spinning them up. Amazon rolled out a fix, but
 we're still seeing similar issues, though not nearly as bad. The first fix
 was for something network related, and apparently sending lots of data
 through the instances caused a kernel panic on the host. We have no
 information yet about the current issue.

 Wes

   Steven Wu stevenz...@gmail.com
  June 2, 2015 at 4:22 PM
 Wes/Daniel,

 can you elaborate what kind of instability you have encountered?

 we are on Ubuntu 14.04.2 and haven't encountered any issues so far. in the
 announcement, they did mention using Ubuntu 14.04 for better disk
 throughput. not sure whether 14.04 also addresses any instability issue you
 encountered or not.

 Thanks,
 Steven

 In order to ensure the best disk throughput performance from your D2 instances
 on Linux, we recommend that you use the most recent version of the Amazon
 Linux AMI, or another Linux AMI with a kernel version of 3.8 or later. The
 D2 instances provide the best disk performance when you use a Linux
 kernel that supports Persistent Grants – an extension to the Xen block ring
 protocol that significantly improves disk throughput and scalability. The
 following Linux AMIs support this feature:

- Amazon Linux AMI 2015.03 (HVM)
- Ubuntu Server 14.04 LTS (HVM)
- Red Hat Enterprise Linux 7.1 (HVM)
- SUSE Linux Enterprise Server 12 (HVM)




   Daniel Nelson daniel.nel...@vungle.com
  June 2, 2015 at 2:42 PM

 Do you have any workarounds for the d2 issues? We’ve been using them for
 our Kafkas too, and ran into the instability. We’re on Ubuntu 12.04 and
 plan to try on 14.04 with the latest HWE to see if that helps any.

 Thanks!
   Wes Chow w...@chartbeat.com
  June 2, 2015 at 1:39 PM

 We have run d2 instances with Kafka. They're currently unstable -- Amazon
 confirmed a host issue with d2 instances that gets tickled by a Kafka
 workload yesterday. Otherwise, it seems the d2 instance type is ideal as it
 gets an enormous amount of disk throughput and you'll likely be network
 bottlenecked.

 Wes


   Steven Wu stevenz...@gmail.com
  June 2, 2015 at 1:07 PM
 EBS (network attached storage) has got a lot better over the last a few
 years. we don't quite trust it for kafka workload.

 At Netflix, we were going with the new d2 instance type (HDD). our
 perf/load testing shows it satisfy our workload. SSD is better in latency
 curve but pretty comparable in terms of throughput. we can use the extra
 space from HDD for longer retention period.

 On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid
 h...@pinterest.com.invalid




Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-02 Thread Steven Wu
EBS (network attached storage) has got a lot better over the last a few
years. we don't quite trust it for kafka workload.

At Netflix, we were going with the new d2 instance type (HDD). our
perf/load testing shows it satisfy our workload. SSD is better in latency
curve but pretty comparable in terms of throughput. we can use the extra
space from HDD for longer retention period.

On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid
wrote:

 We have been hosting kafka brokers in Amazon EC2 and we are using EBS
 disk.  But periodically we were hit by long I/O wait time on EBS in some
 Availability Zones.

 We are thinking to change the instance types to a local HDD or local SSD.
 HDD is cheaper and bigger and seems quite fit for the Kafka use case which
 is mostly sequential read/write, but some early experiments show the HDD
 cannot catch up with the message producing speed since there are many
 topic/partitions on the broker which actually makes the disk I/O more
 randomly accessed.

 How are people's experience of choosing disk types on Amazon?



Re: Async Producer Callback

2015-03-19 Thread Steven Wu
in your callback impl object, you can save a reference to the actual
message.

On Wed, Mar 18, 2015 at 10:45 PM, sunil kalva kalva.ka...@gmail.com wrote:

 Hi
 How do i access the actual message which is failed to send to cluster using
 Callback interface and onCompletion method.

 Basically if the sender is failed to send, i want to add it to a temp queue
 and retry them later.

 --Sunil



Re: non-blocking sends when cluster is down

2015-02-26 Thread Steven Wu
metadata fetch only happens/blocks for the first time you call send. after
the metadata is retrieved can cached in memory. it will not block again. so
yes, there is a possibility it can block. of course, if cluster is down and
metadata was never fetched, then every send can block.

metadata is also refreshed periodically after the first fetch.
metadata.max.age.ms=30


On Thu, Feb 26, 2015 at 4:47 AM, Gary Ogden gog...@gmail.com wrote:

 I was actually referring to the metadata fetch. Sorry I should have been
 more descriptive. I know we can decrease the metadata.fetch.timeout.ms
 setting to be a lot lower, but it's still blocking if it can't get the
 metadata. And I believe that the metadata fetch happens every time we call
 send()?

 On 25 February 2015 at 19:03, Guozhang Wang wangg...@gmail.com wrote:

  Hi Gray,
 
  The Java producer will block on send() when the buffer is full and
  block.on.buffer.full = true (
  http://kafka.apache.org/documentation.html#newproducerconfigs). If you
 set
  the config to false the send() call will throw a BufferExhaustedException
  which, in your case, can be caught and ignore and allow the message to
 drop
  on the floor.
 
  Guozhang
 
 
 
  On Wed, Feb 25, 2015 at 5:08 AM, Gary Ogden gog...@gmail.com wrote:
 
   Say the entire kafka cluster is down and there's no brokers to connect
  to.
   Is it possible to use the java producer send method and not block until
   there's a timeout?  Is it as simple as registering a callback method?
  
   We need the ability for our application to not have any kind of delay
  when
   sending messages and the cluster is down.  It's ok if the messages are
  lost
   when the cluster is down.
  
   Thanks!
  
 
 
 
  --
  -- Guozhang
 



Re: New Producer - Is the configurable partitioner gone?

2015-02-22 Thread Steven Wu
 The low connection partitioner might work for this
by attempting to reuse recently used nodes whenever possible. That is
useful in environments with lots and lots of producers where you don't care
about semantic partitioning.

In one of the perf test, we found that above sticky partitioner improved
batching and reduced cpu util at broker side by 60%. We plan to make it our
default partitioner.



On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Daniel,

 Yeah I think that would be doable. If you want to pursue it you would need
 to do a quick KIP just to get everyone on the same page since this would be
 a public interface we would have to support over a long time:

 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

 When we have the details worked out, then it should be a fairly
 straight-forward patch to make that pluggable.

 A few comments:
 - I think we should just make the DefaultPartitioner the default value for
 that configuration, rather than having it be a fall back.
 - You need to pass in the binary key and value in addition to the java
 objects. Otherwise any partitioning based on the binary value will require
 reserializing these.
 - If we add this option we should really ship at least one other useful
 partitioning strategy. The low connection partitioner might work for this
 by attempting to reuse recently used nodes whenever possible. That is
 useful in environments with lots and lots of producers where you don't care
 about semantic partitioning. It would be good to think through if there are
 any other useful partitioning strategies to make sure they would also be
 doable with the interface we would end up with.
 - Currently Cluster is not a public class so we'll have to think about
 whether we want to make that public.

 -Jay


 On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener 
 daniel.wege...@holisticon.de wrote:

 
  Jay Kreps jay.kreps@... writes:
 
  
   Hey Daniel,
  
   partitionsFor() will block the very first time it sees a new topic that
  it
   doesn't have metadata for yet. If you want to ensure you don't block
 even
   that one time, call it prior to your regular usage so it initializes
  then.
  
   The rationale for adding a partition in ProducerRecord was that there
 are
   actually cases where you want to associate a key with the record but
 not
   use it for partitioning. The rationale for not retaining the pluggable
   partitioner was that you could always just use the partition (many
 people
   dislike the plugin apis and asked for it). Personally, like you, I
   preferred the plugin apis.
  
   We aren't cut off from exposing the existing partitioner usage as a
  public
   api that you can override if there is anyone who wants that. I think
 one
   nice thing about it would be the ability to ship with an alternative
   partitioning strategy that you could enable purely in config. For
 example
   the old producer had a partitioning strategy that attempted to minimize
  the
   number of TCP connections for cases where there was no key. 98% of
 people
   loathed that, but for 2% of people it was useful and this would be a
 way
  to
   still include that behavior for people migrating to the new producer.
  
   -Jay
  
   On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener 
   daniel.wegener@... wrote:
  
Gwen Shapira gshapira at ... writes:
   

 Hi Daniel,

 I think you can still use the same logic you had in the custom
partitioner
 in the old producer. You just move it to the client that creates
 the
 records.
 The reason you don't cache the result of partitionsFor is that the
producer
 should handle the caching for you, so its not necessarily a long or
 blocking call.

 I see it as a pretty small change to the API. But I'm not sure what
  drove
 the change either.

 Gwen

 On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener 
 Daniel.Wegener at ... wrote:

  Hello Kafka-users!
 
  I am facing a migration from a kind of ( a bit self plumbed)
 kafka
0.8.1
  producer to the new kafka-clients API. I just recognized, that
 the
  new
  KafkaProducer initializes its own Partitioner that cannot be
  changed
(final
  field, no ctor-param, no
 
 Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
  Is
this
  an intentional change?
  If i understand the new API correctly, one should either define a
  key
for
  a message and let the default Partitioner care that it will be
distributed
  over all available partitions or to set an explicit partition
  number
per
  message that will be written to.
 
  The old API api allowed to create ProducerRecors with a key
 and/or
  a
key
  used only for partitioning (but one that is not sent down the
 wire)
  and
  then to provide a custom Partitioner that later could distribute
  this
  partitioning key 

Re: New Producer - Is the configurable partitioner gone?

2015-02-22 Thread Steven Wu
yes. this is with the new java client. since it is using non-blocking NIO,
sender thread probably was able to scan the buffer very frequently. hence
random partitioner won't get much chance to accumulate records for batch or
request.
Setup


* - 3 broker instances (m1.xlarge)- 6 producer instances (m1.xlarge)- topic
partitions: 36- message size: 1 KB- no compression- traffic volume- total:
30 MB / 30K msgs,- per broker: 10 MB / 10K msgs*Summary

partitioner

batched records per request

broker cpu util

random without lingering

1.25

75%

sticky without lingering

2.0

50%

sticky with 100ms lingering

15

33%

there are two ways to improve batching

   1.

   use sticky partitioner that we implement. kafka default is random
   partitioner, where a random partition is selected for each msg. with sticky
   partitioner, we can stick all msgs (to one topic) on the same partition for
   a while (e.g. 1 second) before moving on to next partition.
   2.

   set linger.ms property from kafka producer. it allows message to
   linger around for some period and hope for batching opportunity.

We can deploy one or both methods. But the main point is that improved
batching helps broker a lot.

“linger.ms” can cause risk of filling up the buffer. it works very well
with sticky partitioner because it is very fast to accumulate a full batch.



On Sun, Feb 22, 2015 at 5:21 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Interesting, and this was with the new Java client? This sounds like as
 much an opportunity for improvement in the code as anything. Would you be
 willing to share the details?

 -jay

 On Sunday, February 22, 2015, Steven Wu stevenz...@gmail.com wrote:

   The low connection partitioner might work for this
  by attempting to reuse recently used nodes whenever possible. That is
  useful in environments with lots and lots of producers where you don't
 care
  about semantic partitioning.
 
  In one of the perf test, we found that above sticky partitioner
 improved
  batching and reduced cpu util at broker side by 60%. We plan to make it
 our
  default partitioner.
 
 
 
  On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps jay.kr...@gmail.com
  javascript:; wrote:
 
   Hey Daniel,
  
   Yeah I think that would be doable. If you want to pursue it you would
  need
   to do a quick KIP just to get everyone on the same page since this
 would
  be
   a public interface we would have to support over a long time:
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
  
   When we have the details worked out, then it should be a fairly
   straight-forward patch to make that pluggable.
  
   A few comments:
   - I think we should just make the DefaultPartitioner the default value
  for
   that configuration, rather than having it be a fall back.
   - You need to pass in the binary key and value in addition to the java
   objects. Otherwise any partitioning based on the binary value will
  require
   reserializing these.
   - If we add this option we should really ship at least one other useful
   partitioning strategy. The low connection partitioner might work for
 this
   by attempting to reuse recently used nodes whenever possible. That is
   useful in environments with lots and lots of producers where you don't
  care
   about semantic partitioning. It would be good to think through if there
  are
   any other useful partitioning strategies to make sure they would also
 be
   doable with the interface we would end up with.
   - Currently Cluster is not a public class so we'll have to think about
   whether we want to make that public.
  
   -Jay
  
  
   On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener 
   daniel.wege...@holisticon.de javascript:; wrote:
  
   
Jay Kreps jay.kreps@... writes:
   

 Hey Daniel,

 partitionsFor() will block the very first time it sees a new topic
  that
it
 doesn't have metadata for yet. If you want to ensure you don't
 block
   even
 that one time, call it prior to your regular usage so it
 initializes
then.

 The rationale for adding a partition in ProducerRecord was that
 there
   are
 actually cases where you want to associate a key with the record
 but
   not
 use it for partitioning. The rationale for not retaining the
  pluggable
 partitioner was that you could always just use the partition (many
   people
 dislike the plugin apis and asked for it). Personally, like you, I
 preferred the plugin apis.

 We aren't cut off from exposing the existing partitioner usage as a
public
 api that you can override if there is anyone who wants that. I
 think
   one
 nice thing about it would be the ability to ship with an
 alternative
 partitioning strategy that you could enable purely in config. For
   example
 the old producer had a partitioning strategy that attempted to
  minimize
the
 number of TCP connections for cases where there was no key. 98% of
   people

Re: big cpu jump on producer in face of broker outage

2015-02-19 Thread Steven Wu
will try 0.8.2.1 on producer and report back result.

On Thu, Feb 19, 2015 at 11:52 AM, Jun Rao j...@confluent.io wrote:

 This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you
 try that version or 0.8.2.1 which is being voted now.

 Thanks,

 Jun

 On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu stevenz...@gmail.com wrote:

  forgot to mention in case it matters
  producer: 0.8.2-beta
  broker: 0.8.1.1
 
  On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu stevenz...@gmail.com
 wrote:
 
   I think this is an issue caused by KAFKA-1788.
  
   I was trying to test producer resiliency to broker outage. In this
   experiment, I shutdown all brokers and see how producer behavior.
  
   Here are the observations
   1) kafka producer can recover from kafka outage. i.e. send resumed
 after
   brokers came back
   2) producer instance saw big cpu jump during outage. 28% - 52% in one
   test.
  
   Note that I didn't observe cpu issue when new producer instance started
   with brokers outage. In this case, there are no messages accumulated in
  the
   buffer, because KafkaProducer constructor failed with DNS lookup for
   route53 name. when brokers came up, my wrapper re-created KafkaProducer
   object and recover from outage with sending messages.
  
   Here is the cpu graph for a running producer instance where broker
 outage
   happened in the middle of test run. it shows cpu problem.
  
  
 
 https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing
  
   Here is the cpu graph for a new producer instance where broker outage
   happened before instance startup. cpu is good here.
  
  
 
 https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing
  
   Note that producer is a 4-core m1.xlarge instance. x-axis is time,
 y-axis
   is cpu util.
  
   Thanks,
   Steven
  
 



big cpu jump on producer in face of broker outage

2015-02-19 Thread Steven Wu
I think this is an issue caused by KAFKA-1788.

I was trying to test producer resiliency to broker outage. In this
experiment, I shutdown all brokers and see how producer behavior.

Here are the observations
1) kafka producer can recover from kafka outage. i.e. send resumed after
brokers came back
2) producer instance saw big cpu jump during outage. 28% - 52% in one
test.

Note that I didn't observe cpu issue when new producer instance started
with brokers outage. In this case, there are no messages accumulated in the
buffer, because KafkaProducer constructor failed with DNS lookup for
route53 name. when brokers came up, my wrapper re-created KafkaProducer
object and recover from outage with sending messages.

Here is the cpu graph for a running producer instance where broker outage
happened in the middle of test run. it shows cpu problem.
https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing

Here is the cpu graph for a new producer instance where broker outage
happened before instance startup. cpu is good here.
https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing

Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis
is cpu util.

Thanks,
Steven


Re: big cpu jump on producer in face of broker outage

2015-02-19 Thread Steven Wu
forgot to mention in case it matters
producer: 0.8.2-beta
broker: 0.8.1.1

On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu stevenz...@gmail.com wrote:

 I think this is an issue caused by KAFKA-1788.

 I was trying to test producer resiliency to broker outage. In this
 experiment, I shutdown all brokers and see how producer behavior.

 Here are the observations
 1) kafka producer can recover from kafka outage. i.e. send resumed after
 brokers came back
 2) producer instance saw big cpu jump during outage. 28% - 52% in one
 test.

 Note that I didn't observe cpu issue when new producer instance started
 with brokers outage. In this case, there are no messages accumulated in the
 buffer, because KafkaProducer constructor failed with DNS lookup for
 route53 name. when brokers came up, my wrapper re-created KafkaProducer
 object and recover from outage with sending messages.

 Here is the cpu graph for a running producer instance where broker outage
 happened in the middle of test run. it shows cpu problem.

 https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing

 Here is the cpu graph for a new producer instance where broker outage
 happened before instance startup. cpu is good here.

 https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing

 Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis
 is cpu util.

 Thanks,
 Steven



Re: could new java producer miss callbacks after successful send?

2015-02-10 Thread Steven Wu
couldn't reproduce/confirm the issue with my test. send 6 million msgs from
6 instances. got 6 million callbacks.

this could be some metric issues.

On Mon, Feb 9, 2015 at 8:23 PM, Steven Wu stevenz...@gmail.com wrote:

 I don't have strong evidence that this is a bug yet. let me write some
 test program and see if I can confirm/reproduce the issue.

 On Mon, Feb 9, 2015 at 7:59 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hmm, that does sound like a bug, we haven't seen that. How easy is it to
 reproduce this?

 -Jay

 On Mon, Feb 9, 2015 at 5:19 PM, Steven Wu stevenz...@gmail.com wrote:

  We observed some small discrepancy in messages sent per second reported
 at
  different points. 1) and 4) matches very close. 2) and 3) matches very
  close but are about *5-6% lower* compared to 1) and 4).
  1) send attempt from producer
  2) send success from producer
  3) record-send-rate reported by kafka producer
  4) MessagesInPerSecond reported by kafka broker
 
  note that send success for 2) is incremented when onCompletion is called
  without error/exception). there is also a send error count when
  onCompletion is called with error. it is always zero.
 
  that's why I am wondering whether there are some callback misses?
 
  some info about the setup
  * producer: 0.8.2-beta
  * broker: 0.8.1.1
  * acks=1
 
  Thanks,
  Steven
 





could new java producer miss callbacks after successful send?

2015-02-09 Thread Steven Wu
We observed some small discrepancy in messages sent per second reported at
different points. 1) and 4) matches very close. 2) and 3) matches very
close but are about *5-6% lower* compared to 1) and 4).
1) send attempt from producer
2) send success from producer
3) record-send-rate reported by kafka producer
4) MessagesInPerSecond reported by kafka broker

note that send success for 2) is incremented when onCompletion is called
without error/exception). there is also a send error count when
onCompletion is called with error. it is always zero.

that's why I am wondering whether there are some callback misses?

some info about the setup
* producer: 0.8.2-beta
* broker: 0.8.1.1
* acks=1

Thanks,
Steven


Re: could new java producer miss callbacks after successful send?

2015-02-09 Thread Steven Wu
I don't have strong evidence that this is a bug yet. let me write some test
program and see if I can confirm/reproduce the issue.

On Mon, Feb 9, 2015 at 7:59 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hmm, that does sound like a bug, we haven't seen that. How easy is it to
 reproduce this?

 -Jay

 On Mon, Feb 9, 2015 at 5:19 PM, Steven Wu stevenz...@gmail.com wrote:

  We observed some small discrepancy in messages sent per second reported
 at
  different points. 1) and 4) matches very close. 2) and 3) matches very
  close but are about *5-6% lower* compared to 1) and 4).
  1) send attempt from producer
  2) send success from producer
  3) record-send-rate reported by kafka producer
  4) MessagesInPerSecond reported by kafka broker
 
  note that send success for 2) is incremented when onCompletion is called
  without error/exception). there is also a send error count when
  onCompletion is called with error. it is always zero.
 
  that's why I am wondering whether there are some callback misses?
 
  some info about the setup
  * producer: 0.8.2-beta
  * broker: 0.8.1.1
  * acks=1
 
  Thanks,
  Steven
 



Re: error handling with high-level consumer

2015-02-05 Thread Steven Wu
Jun,

we are already passing the retention period. so can't go back and do a
DumpLogSegment.
plus there are other factors make this exercise difficult:
1) this topic has very high traffic volume
2) we don't know the msg offset that is corrupted

anyhow, it doesn't happen often. but can you advise proper action/handling
in this case? any other exceptions from iterator.next() that we should
handle?

Thanks,
Steven



On Wed, Feb 4, 2015 at 9:33 PM, Jun Rao j...@confluent.io wrote:

 1) Does the corruption happen to console consumer as well? If so, could you
 run DumpLogSegment tool to see if the data is corrupted on disk?

 Thanks,

 Jun


 On Wed, Feb 4, 2015 at 9:55 AM, Steven Wu stevenz...@gmail.com wrote:

  Hi,
 
  We have observed these two exceptions with consumer *iterator.next()*
  recently. want to ask how should we handle them properly.
 
  *1) CRC corruption*
  Message is corrupt (stored crc = 433657556, computed crc = 3265543163)
 
  I assume in this case we should just catch it and move on to the next
 msg?
  any other iterator/consumer exception we should catch and handle?
 
 
  *2) Unrecoverable consumer erorr with Iterator is in failed state*
 
  yesterday, one of our kafka consumers got stuck with very large maxalg
 and
  was throwing the following exception.
 
  2015-02-04 08:35:19,841 ERROR KafkaConsumer-0 KafkaConsumer - Exception
 on
  consuming kafka with topic: topic_name
  java.lang.IllegalStateException: Iterator is in failed state
  at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
  at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38)
  at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
  at
 com.netflix.suro.input.kafka.KafkaConsumer$1.run(KafkaConsumer.java:103)
  at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
  at java.util.concurrent.FutureTask.run(FutureTask.java:262)
  at
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 
  we had a surge of traffic of topic_name, so I guess the traffic storm
  caused the problem. I tried to restart a few consumer instances but after
  rebalancing, another instance got assigned the problematic partitions and
  got stuck again with the above errors.
 
  We decided to drop messages, stop all consumer instances, reset all
 offset
  by deleting zk entries and restarted them, the problem went away.
 
  Producer version is kafka_2.8.2-0.8.1.1 with snappy-java-1.0.5
  Consumer version is kafka_2.9.2-0.8.2-beta with snappy-java-1.1.1.6
 
  We googled this issue but this was already fixed long time ago on 0.7.x.
  any idea? is mismatched snappy version the culpit? is it a bug in
  0.8.2-beta?
 
 
  Thanks,
  Steven
 



high cpu and network traffic when cluster has no topic

2015-02-03 Thread Steven Wu
Hi,

We have observed high cpu and high network traffic problem when
1) cluster (0.8.1.1) has no topic
2) KafkaProducer (0.8.2-beta) object is created without sending any traffic

We have observed such problem twice. In both cases, problem went away
immediately after one/any topic is created.

Is this a known issue? Just want to check with the community first before I
spend much time to reproduce it.

I couldn't reproduce the issue with similar setup with unit test code in
IDE. start two brokers with no topic locally on my laptop. create a
KafkaProducer object without sending any msgs. but I only tested with
0.8.2-beta for both broker and producer.

Thanks,
Steven


Re: high cpu and network traffic when cluster has no topic

2015-02-03 Thread Steven Wu
sure. will try my unit test again with 0.8.2.0 release tomorrow and report
back my findings.

On Tue, Feb 3, 2015 at 8:42 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Steven,

 That sounds like a bug. I think we fixed a few producer high cpu issues
 since the beta, I wonder if you could repeat the same test with the 0.8.2.
 final release?

 -Jay

 On Tue, Feb 3, 2015 at 8:37 PM, Steven Wu stevenz...@gmail.com wrote:

  actually, my local test can reproduce the issue although not immediately.
  seems to happen after a few mins. I enabled TRACE level logging. here
 seems
  to be the tight loop. you can see that there are two metadata requests in
  one milli-seconds.
 
  kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301
 -
  Ignoring empty metadata response with correlation id 360185.
  kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369
 -
  Trying to send metadata request to node -2
  kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:374
 -
  Sending metadata request ClientRequest(expectResponse=true, payload=null,
 
 
 request=RequestSend(header={api_key=3,api_version=0,correlation_id=360186,client_id=foo},
  body={topics=[]})) to node -2
  kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301
 -
  Ignoring empty metadata response with correlation id 360186.
  kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369
 -
  Trying to send metadata request to node -2
 
 
  On Tue, Feb 3, 2015 at 8:10 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Hi,
  
   We have observed high cpu and high network traffic problem when
   1) cluster (0.8.1.1) has no topic
   2) KafkaProducer (0.8.2-beta) object is created without sending any
  traffic
  
   We have observed such problem twice. In both cases, problem went away
   immediately after one/any topic is created.
  
   Is this a known issue? Just want to check with the community first
 before
   I spend much time to reproduce it.
  
   I couldn't reproduce the issue with similar setup with unit test code
 in
   IDE. start two brokers with no topic locally on my laptop. create a
   KafkaProducer object without sending any msgs. but I only tested with
   0.8.2-beta for both broker and producer.
  
   Thanks,
   Steven
  
 



Re: high cpu and network traffic when cluster has no topic

2015-02-03 Thread Steven Wu
actually, my local test can reproduce the issue although not immediately.
seems to happen after a few mins. I enabled TRACE level logging. here seems
to be the tight loop. you can see that there are two metadata requests in
one milli-seconds.

kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301 -
Ignoring empty metadata response with correlation id 360185.
kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369 -
Trying to send metadata request to node -2
kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:374 -
Sending metadata request ClientRequest(expectResponse=true, payload=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=360186,client_id=foo},
body={topics=[]})) to node -2
kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301 -
Ignoring empty metadata response with correlation id 360186.
kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369 -
Trying to send metadata request to node -2


On Tue, Feb 3, 2015 at 8:10 PM, Steven Wu stevenz...@gmail.com wrote:

 Hi,

 We have observed high cpu and high network traffic problem when
 1) cluster (0.8.1.1) has no topic
 2) KafkaProducer (0.8.2-beta) object is created without sending any traffic

 We have observed such problem twice. In both cases, problem went away
 immediately after one/any topic is created.

 Is this a known issue? Just want to check with the community first before
 I spend much time to reproduce it.

 I couldn't reproduce the issue with similar setup with unit test code in
 IDE. start two brokers with no topic locally on my laptop. create a
 KafkaProducer object without sending any msgs. but I only tested with
 0.8.2-beta for both broker and producer.

 Thanks,
 Steven



Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-01 Thread Steven Wu
In Netflix, we have been using route53 DNS name as bootstrap servers in AWS
env. Basically, when a kafka broker start, we add it to route53 DNS name
for the cluster. this is like the VIP that Jay suggested.

But we are also moving toward to use Eureka service registry for
bootstrapping. We are worried that if DNS name happens to resolve to a bad
broker. it might impact the bootstrap process/resiliency. We want to get a
list of brokers from Eureka to pass in as bootstrap.servers.



On Sun, Feb 1, 2015 at 5:30 AM, Jay Kreps jay.kr...@gmail.com wrote:

 You may already know this but the producer doesn't require a complete list
 of brokers in its config, it just requires the connection info for one
 active broker which it uses to discover the rest of the brokers. We allow
 you to specify multiple urls here for failover in cases where you aren't
 using a vip. So if you can put three brokers into the VIP for metadata
 bootstrapping you can still scale up and down the rest of the cluster.

 -Jay

 On Sun, Feb 1, 2015 at 12:17 AM, Alex The Rocker alex.m3...@gmail.com
 wrote:

  Jun:
 
  You raise a very good question: let me explain why we use
  Broker.getConnectionString(), so may be we'll get a supported way to
  answer our need.
 
  We use Broker.getConnectionString() because we deploy Kafka services
  in Amazon EC2 with the following architecture:
  * Three VMs dedicated to Zookeeper processes
  * At least two VMs with Kafka broker, but depending on load it can be
  scaled to more broker VMs. Brokers self-register their address in
  Zookeeper by serializing Broker objects in Zk.
 
  The VMs with Zookeeper have Elastic IPs = stable public IPs,
 
  These public IPs are fed to the  various Application services which
  rely on Kafka to stream their logs  monitoring data to our central
  Hadoop system.
 
  Using zkclient and the above mentionned public zookeeper IPs, we get
  the list of brokers registrered to a given Kafka service:  this is
  where we unserializer Broker objects and then use
  getConnectionString() to discover the brokers' addresses. Then,
  brokers addresses are used to initialize the Kafka producer(s).
 
  The whole trick is that we cannot use Elastic IP (=stable IPs) for
  Kafka VMs, because of their 'elastic nature : we want to be able to
  scale up / down the number of VMs with Kafka brokers.
 
  Now, we understand that using non public Kafka API is bad : we've been
  broken when moving to 0.8.1.1, then again when moving to 0.8.2.0...
 
  So it's time to raise the right question: what would be the supported
  way to configure our producers given our dynamic-IP-for-brokers
  context?
 
  Thanks,
  Alex.
 
  2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre
  alexandre.vermeerber...@3ds.com:
  
   -Original Message-
   From: Jun Rao [mailto:j...@confluent.io]
   Sent: Sunday, February 01, 2015 3:03
   To: users@kafka.apache.org; kafka-clie...@googlegroups.com
   Cc: d...@kafka.apache.org
   Subject: Re: [VOTE] 0.8.2.0 Candidate 3
  
   Hi, Alex,
  
   Thanks for testing RC3.
  
   Broker.connectionString() is actually not part of the public api for
 the
  producer. Is there a particular reason that you need to use this api?
  
   Thanks,
  
   Jun
  
   On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker alex.m3...@gmail.com
 
   wrote:
  
   Hello,
  
   I have read Broker.scala source code, and I found the answer:
- With Kafka 0.8.1.1 we used Broker.getConnectionString() in our Java
   code.
- With Kafka 0.8.2.0, this method has been replaced by a 0-arity
   method without the get prefix, so we have to change our Java code to
   call
   Broker.connectionString()
  
   So despite binary compatibility is broken, we have a by-pass.
   I hope this will help other people relying on this API...
  
   and I'm going to continue tests with 0.8.2 rc3..
  
   Alex
  
   2015-01-31 21:23 GMT+01:00 Alex The Rocker alex.m3...@gmail.com:
  
Hello,
   
I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with
our
application:
   
1st test:
==
  replace all kafka .jar files in our application on consumming side
  (without recompiling anything)
  = tests passed, OK
   
2nd test:
===
  replace all kafka .jar files in our application on producubg side
  (without recompiling anything)
  = KO, we get this error:
   
2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
Exception in thread Timer-2
2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
java.lang.NoSuchMethodError:
kafka.cluster.Broker.getConnectionString()Ljava/lang/String;
   
Which means that binary compatibility with 0.8.1.1 version has been
   broken.
We use getConnectionString() to get Broker's zookeepers adresses,
see
   this
answer from Neha:
   
   
   
  
 http://mail-archives.apache.org/mod_mbox/kafka-users/201404.mbox/%3CCA
   

Re: Using Only one partition to store object change log

2015-01-30 Thread Steven Wu
do you need total ordering among all events? or you just need ordering by
some partitionKey (e.g. events regarding one particular database key or
user id)? if it's the later, you can create multiple partitions and just
partition your events using the key to different kafka partitions.

On Fri, Jan 30, 2015 at 12:57 AM, noodles rungumpth...@gmail.com wrote:

 HI Group:

 I'm currently working to push object changes into external services. Now we
 are trying to append the change record into a Kafka.

 **My problem**

 Only one partition can be created for one kind of data, so that the
 sequence of change can be guaranteed. If I do like that, I guess I will
 lost the performance and the load balance feature.

 Do I need to worry about this problem?

 --
 *noodles!*



Re: [DISCUSSION] Boot dependency in the new producer

2015-01-26 Thread Steven Wu
Jay, I don't think this line will bootstrap full metadata (for all topics).
it will just construct the cluster object with bootstrap host. you need to
do metadata.add(topic)  to set interest of a topic's partition metadata.

Guozhang, I personally think this is ok. it just do a few DNS lookup or TCP
connection before first send.

On Mon, Jan 26, 2015 at 2:07 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Oh, yes, I guess I thought you meant that construction of the client would
 block on the metadata request.

 I don't personally think that is a problem because if it fails it will
 retry in the background, right?

 But actually I think this is probably violating another desirable criteria
 we had talked about which was keeping the producer from bootstrapping the
 full metadata for all partitions. If it is doing that during construction
 time presumably the resulting metadata request is for all partitions, no?
 That isn't a huge problem, but I think isn't what was intended.

 -Jay

 On Mon, Jan 26, 2015 at 1:34 PM, Guozhang Wang wangg...@gmail.com wrote:

  It will set the needUpdate flag to true and hence the background Sender
  will try to talk to the bootstrap servers.
 
  Guozhang
 
  On Mon, Jan 26, 2015 at 1:12 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Hey Guozhang,
  
   That line shouldn't cause any connections to Kafka to be established,
  does
   it? All that is doing is creating the Cluster pojo using the supplied
   addresses. The use of InetSocketAddress may cause some dns stuff to
  happen,
   though...
  
   -Jay
  
   On Mon, Jan 26, 2015 at 10:50 AM, Guozhang Wang wangg...@gmail.com
   wrote:
  
Hi all,
   
I am not sure if we have discussed about this before, but recently I
realized that we have introduced boot dependency of the kafka-server
specified by the bootstrap.servers config in the new producer. More
specifically, although in the old producer we also have a similar
  config
for specifying the broker list, the producer will not try to connect
 to
those brokers until the first message send call is triggered; whereas
  in
the new producer, it will try to talk to them in construction time
 via:
   
update(Cluster.bootstrap(addresses), time.milliseconds());
   
   
I personally am neutral to this change, as in most cases the
   corresponding
kafka server should be up and running before the producer clients are
deployed, but there are still some corner cases when it is not true,
  for
example some standalone deployment tests of the app embedded with
 some
clients, etc. So I would like to bring this up to people's attention
 if
   we
have not discussed about it before: do we think this is OK to
 introduce
this boot dependency in the new producer?
   
-- Guozhang
   
  
 
 
 
  --
  -- Guozhang
 



Re: Kafka 0.8.2 new producer blocking on metadata

2015-01-05 Thread Steven Wu
 preinitialize.metadata=true/false can help to certain extent. if the
kafka cluster is down, then metadata won't be available for a long time
(not just the first msg). so to be safe, we have to set 
metadata.fetch.timeout.ms=1 to fail fast as Paul mentioned. I can also
echo Jay's comment that on-demand fetch of metadata might be more
efficient, since cluster may have many topics that a particular producer
may not care.

so I plan to do sth similar to what Paul described.
- metadata.fetch.timeout.ms=1
- enqueue msg to a pending queue when topic metadata not available.
- have a background thread check when metadata become available and drain
the pending queue
- optionally, prime topic metadata asynchronously during init (if
configured)

Just wondering whether above should be the default behavior  of best-effort
non-blocking delivery in kafka clients. then we don't have to reinvent the
wheels.

Thanks,
Steven



On Mon, Dec 29, 2014 at 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote:

 I don't think a separate queue will be a very simple solution to implement.

 Could you describe your use case a little bit more. It does seem to me that
 as long as the metadata fetch happens only once and the blocking has a
 tight time bound this should be okay in any use case I can imagine. And, of
 course, by default the client blocks anyway whenever you exhaust the memory
 buffer space. But it sounds like you feel it isn't. Maybe you could
 describe the scenario a bit?

 I think one thing we could do is what was discussed in another thread,
 namely add an option like
   preinitialize.metadata=true/false
 which would default to false. When true this would cause the producer to
 just initialize metadata for all topics when it is created. Note that this
 then brings back the opposite problem--doing remote communication during
 initialization which tends to bite a lot of people. But since this would be
 an option that would default to false perhaps it would be less likely to
 come as a surprise.

 -Jay

 On Mon, Dec 29, 2014 at 8:38 AM, Steven Wu stevenz...@gmail.com wrote:

  +1. it should be truly async in all cases.
 
  I understand some challenges that Jay listed in the other thread. But we
  need a solution nonetheless. e.g. can we maintain a separate
  list/queue/buffer for pending messages without metadata.
 
  On Tue, Dec 23, 2014 at 12:57 PM, John Boardman boardmanjo...@gmail.com
 
  wrote:
 
   I was just fighting this same situation. I never expected the new
  producer
   send() method to block as it returns a Future and accepts a Callback.
   However, when I tried my unit test, just replacing the old producer
 with
   the new, I immediately started getting timeouts waiting for metadata. I
   struggled with this until I went into the source code and found the
  wait()
   that waits for the metadata.
  
   At that point I realized that this new async producer would have to
 be
   executed on its own thread, unlike the old producer, which complicates
 my
   code unnecessarily. I totally agree with Paul that the contract of
 send()
   is being completely violated with internal code that can block.
  
   I did try fetching the metadata first, but that only worked for a few
  calls
   before the producer decided it was time to update the metadata again.
  
   Again, I agree with Paul that this API should be fixed so that it is
  truly
   asynchronous in all cases. Otherwise, it cannot be used on the main
  thread
   of an application as it will block and fail.
  
 



Re: Kafka 0.8.2 new producer blocking on metadata

2014-12-29 Thread Steven Wu
+1. it should be truly async in all cases.

I understand some challenges that Jay listed in the other thread. But we
need a solution nonetheless. e.g. can we maintain a separate
list/queue/buffer for pending messages without metadata.

On Tue, Dec 23, 2014 at 12:57 PM, John Boardman boardmanjo...@gmail.com
wrote:

 I was just fighting this same situation. I never expected the new producer
 send() method to block as it returns a Future and accepts a Callback.
 However, when I tried my unit test, just replacing the old producer with
 the new, I immediately started getting timeouts waiting for metadata. I
 struggled with this until I went into the source code and found the wait()
 that waits for the metadata.

 At that point I realized that this new async producer would have to be
 executed on its own thread, unlike the old producer, which complicates my
 code unnecessarily. I totally agree with Paul that the contract of send()
 is being completely violated with internal code that can block.

 I did try fetching the metadata first, but that only worked for a few calls
 before the producer decided it was time to update the metadata again.

 Again, I agree with Paul that this API should be fixed so that it is truly
 asynchronous in all cases. Otherwise, it cannot be used on the main thread
 of an application as it will block and fail.



Re: Is Kafka documentation regarding null key misleading?

2014-12-11 Thread Steven Wu
Guozhang,

can you point me to the code that implements periodic/sticky random
partitioner? I actually like to try it out in our env, even though I assume
it is NOT ported to 0.8.2 java producer.

Thanks,
Steven


On Mon, Dec 8, 2014 at 1:43 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Yury,

 Originally the producer behavior under null-key is random random, but
 later changed to this periodic random to reduce the number of sockets on
 the server side: imagine if you have n brokers and m producers where m 
 n, with random random distribution each server will need to maintain a
 socket with each of the m producers.

 We realized that this change IS misleading and we have changed back to
 random random in the new producer released in 0.8.2.


 Guozhang

 On Fri, Dec 5, 2014 at 10:43 AM, Andrew Jorgensen 
 ajorgen...@twitter.com.invalid wrote:

  If you look under Producer configs you see the following key ‘
  topic.metadata.refresh.interval.ms’ with a default of 600 * 1000 (10
  minutes). It is not entirely clear but this controls how often a producer
  will a null key partitioner will switch partitions that it is writing to.
  In my production app I set this down to 1 minute and haven’t seen any ill
  effects but it is good to note that the shorter you get *could* cause
 some
  issues and extra overhead. I agree this could probably be a little more
  clear in the documentation.
  -
  Andrew Jorgensen
  @ajorgensen
 
  On December 5, 2014 at 1:34:00 PM, Yury Ruchin (yuri.ruc...@gmail.com)
  wrote:
 
  Hello,
 
  I've come across a (seemingly) strange situation when my Kafka producer
  gave so uneven distribution across partitions. I found that I used null
 key
  to produce messages, guided by the following clause in the documentation:
  If the key is null, then a random broker partition is picked. However,
  after looking at the code, I found that the broker partition is not truly
  random for every message - instead, the randomly picked partition number
  sticks and only refreshes after the topic.metadata.refresh.ms expires,
  which is 10 minutes by default. So, with null key the producer keeps
  writing to the same partition for 10 minutes.
 
  Is my understanding of partitioning with null key correct? If yes,
  shouldn't the documentation be fixed then to explicitly describe the
 sticky
  pseudo-random partition assignment?
 
  Thanks,
  Yury
 



 --
 -- Guozhang



Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-09 Thread Steven Wu
 In practice the cases that actually mix serialization types in a single
stream are pretty rare I think just because the consumer then has the
problem of guessing how to deserialize, so most of these will end up with
at least some marker or schema id or whatever that tells you how to read
the data. Arguable this mixed serialization with marker is itself a
serializer type and should have a serializer of its own...

agree that it is unlikely to have mixed serialization format for one
topic/type. But we sometimes/often create one Producer object for one
cluster. and there can be many topics on this cluster. different topics may
have different serialization formats. So I agree with Guozhang's point
regarding data type flexibility of using simple byte[] (instead of
generic K, V).

On Fri, Dec 5, 2014 at 5:00 PM, Jay Kreps j...@confluent.io wrote:

 Hey Sriram,

 Thanks! I think this is a very helpful summary.

 Let me try to address your point about passing in the serde at send time.

 I think the first objection is really to the paired key/value serializer
 interfaces. This leads to kind of a weird combinatorial thing where you
 would have an avro/avro serializer a string/avro serializer, a pb/pb
 serializer, and a string/pb serializer, and so on. But your proposal would
 work as well with separate serializers for key and value.

 I think the downside is just the one you call out--that this is a corner
 case and you end up with two versions of all the apis to support it. This
 also makes the serializer api more annoying to implement. I think the
 alternative solution to this case and any other we can give people is just
 configuring ByteArraySerializer which gives you basically the api that you
 have now with byte arrays. If this is incredibly common then this would be
 a silly solution, but I guess the belief is that these cases are rare and a
 really well implemented avro or json serializer should be 100% of what most
 people need.

 In practice the cases that actually mix serialization types in a single
 stream are pretty rare I think just because the consumer then has the
 problem of guessing how to deserialize, so most of these will end up with
 at least some marker or schema id or whatever that tells you how to read
 the data. Arguable this mixed serialization with marker is itself a
 serializer type and should have a serializer of its own...

 -Jay

 On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian 
 srsubraman...@linkedin.com.invalid wrote:

  This thread has diverged multiple times now and it would be worth
  summarizing them.
 
  There seems to be the following points of discussion -
 
  1. Can we keep the serialization semantics outside the Producer interface
  and have simple bytes in / bytes out for the interface (This is what we
  have today).
 
  The points for this is to keep the interface simple and usage easy to
  understand. The points against this is that it gets hard to share common
  usage patterns around serialization/message validations for the future.
 
  2. Can we create a wrapper producer that does the serialization and have
  different variants of it for different data formats?
 
  The points for this is again to keep the main API clean. The points
  against this is that it duplicates the API, increases the surface area
 and
  creates redundancy for a minor addition.
 
  3. Do we need to support different data types per record? The current
  interface (bytes in/bytes out) lets you instantiate one producer and use
  it to send multiple data formats. There seems to be some valid use cases
  for this.
 
  I have still not seen a strong argument against not having this
  functionality. Can someone provide their views on why we don't need this
  support that is possible with the current API?
 
  One possible approach for the per record serialization would be to define
 
  public interface SerDeK,V {
public byte[] serializeKey();
 
public K deserializeKey();
 
public byte[] serializeValue();
 
public V deserializeValue();
  }
 
  This would be used by both the Producer and the Consumer.
 
  The send APIs can then be
 
  public FutureRecordMetadata send(ProducerRecordK,V record);
  public FutureRecordMetadata send(ProducerRecordK,V record, Callback
  callback);
 
 
  public FutureRecordMetadata send(ProducerRecordK,V record, SerDeK,V
  serde);
 
  public FutureRecordMetadata send(ProducerRecordK,V record, SerDeK,V
  serde, Callback callback);
 
 
  A default SerDe can be set in the config. The producer would use the
  default from the config if the non-serde send APIs are used. The downside
  to this approach is that we would need to have four variants of Send API
  for the Producer.
 
 
 
 
 
 
  On 12/5/14 3:16 PM, Jun Rao j...@confluent.io wrote:
 
  Jiangjie,
  
  The issue with adding the serializer in ProducerRecord is that you need
 to
  implement all combinations of serializers for key and value. So, instead
  of
  just implementing int and string serializers, 

Re: How to produce and consume events in 2 DCs?

2014-10-22 Thread Steven Wu
Erik, I don't know that mirrormaker can't write to a different topic. but
it might be an useful feature request to mirrormaker.

On Wed, Oct 22, 2014 at 12:21 AM, Erik van Oosten 
e.vanoos...@grons.nl.invalid wrote:

 Hi Steven,

 That doesn't work. In your proposal mirrormaker in once DC would copy
 messages from topic A to the other DC in topic A. However, in the other DC
 there is a mirrormaker which does the same, creating a loop. Messages will
 be duplicated, triplicated, etc in a never ending loop.

 Mirroring to another topic would work (mirrormaker doesn't support that),
 and so would mirroring to another cluster. Neha's proposal would work also
 but I assume its a lot more work for the Kafka internals and therefor IMHO
 wouldn't meet the kiss principle.

 Kind regards,
 Erik.


 Steven Wu schreef op 22-10-14 om 01:48:

  I think it doesn't have to be two more clusters. can be just two more
 topics. MirrorMaker can copy from source topics in both regions into one
 aggregate topic.

 On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten 
 e.vanoos...@grons.nl.invalid wrote:

  Thanks Neha,

 Unfortunately, the maintenance overhead of 2 more clusters is not
 acceptable to us.

 Would you accept a pull request on mirror maker that would rename topics
 on the fly?

 For example by accepting the parameter rename:
 —rename src1/dest1,src2/dest2
 or, extended with RE support:
 —rename old_(.*)/new_\1

 Kind regards,
  Erik.


 Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com
 het volgende geschreven:

  Another way to set up this kind of mirroring is by deploying 2 clusters

 in

 each DC - a local Kafka cluster and an aggregate Kafka cluster. The

 mirror

 maker copies data from both the DC's local clusters into the aggregate
 clusters. So if you want access to a topic with data from both DC's, you
 subscribe to the aggregate cluster.

 Thanks,
 Neha

 On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten 
 e.vanoos...@grons.nl.invalid wrote:

  Hi,

 We have 2 data centers that produce events. Each DC has to process

 events

 from both DCs.

 I had the following in mind:

DC 1 | DC 2
 events  |events
+  +  +  |   +  +  +
|  |  |  |   |  |  |
v  v  v  |   v  v  v
 ++ | ++
 | Receiver topic | | | Receiver topic |
 ++   ++
  |  |   mirroring  ||
  |  |   +--+|
  |  |   |   |
  |  ++  |
  v  vv  v
 ++ | ++
 | Consumer topic | | | Consumer topic |
 ++ | ++
+  +  +  |   +  +  +
|  |  |  |   |  |  |
v  v  v  |   v  v  v
   consumers |  consumers


 As each DC has a single Kafka cluster, on each DC the receiver topic
 and
 consumer topic needs to be on the same cluster.
 Unfortunately, mirror maker does not seem to support mirroring to a

 topic

 with another name.

 Is there another tool we could use?
 Or, is there another approach for producing and consuming from 2 DCs?

 Kind regards,
 Erik.

 —
 Erik van Oosten
 http://www.day-to-day-stuff.blogspot.nl/





 --
 Erik van Oosten
 http://www.day-to-day-stuff.blogspot.com/




Re: How to produce and consume events in 2 DCs?

2014-10-21 Thread Steven Wu
I think it doesn't have to be two more clusters. can be just two more
topics. MirrorMaker can copy from source topics in both regions into one
aggregate topic.

On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten 
e.vanoos...@grons.nl.invalid wrote:

 Thanks Neha,

 Unfortunately, the maintenance overhead of 2 more clusters is not
 acceptable to us.

 Would you accept a pull request on mirror maker that would rename topics
 on the fly?

 For example by accepting the parameter rename:
—rename src1/dest1,src2/dest2
 or, extended with RE support:
—rename old_(.*)/new_\1

 Kind regards,
 Erik.


 Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com
 het volgende geschreven:

  Another way to set up this kind of mirroring is by deploying 2 clusters
 in
  each DC - a local Kafka cluster and an aggregate Kafka cluster. The
 mirror
  maker copies data from both the DC's local clusters into the aggregate
  clusters. So if you want access to a topic with data from both DC's, you
  subscribe to the aggregate cluster.
 
  Thanks,
  Neha
 
  On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten 
  e.vanoos...@grons.nl.invalid wrote:
 
  Hi,
 
  We have 2 data centers that produce events. Each DC has to process
 events
  from both DCs.
 
  I had the following in mind:
 
DC 1 | DC 2
 events  |events
+  +  +  |   +  +  +
|  |  |  |   |  |  |
v  v  v  |   v  v  v
  ++ | ++
  | Receiver topic | | | Receiver topic |
  ++   ++
  |  |   mirroring  ||
  |  |   +--+|
  |  |   |   |
  |  ++  |
  v  vv  v
  ++ | ++
  | Consumer topic | | | Consumer topic |
  ++ | ++
+  +  +  |   +  +  +
|  |  |  |   |  |  |
v  v  v  |   v  v  v
   consumers |  consumers
 
 
  As each DC has a single Kafka cluster, on each DC the receiver topic and
  consumer topic needs to be on the same cluster.
  Unfortunately, mirror maker does not seem to support mirroring to a
 topic
  with another name.
 
  Is there another tool we could use?
  Or, is there another approach for producing and consuming from 2 DCs?
 
  Kind regards,
 Erik.
 
  —
  Erik van Oosten
  http://www.day-to-day-stuff.blogspot.nl/
 
 




how to identify rogue consumer

2014-10-08 Thread Steven Wu
I have seen very high Fetch-Consumer-RequestsPerSec (like 15K) per broker
in a relatively idle cluster. My hypothesis some misbehaving consumer has a
tight polling loop without any back-off logic with empty fetch.

Unfortunately, this metric doesn't have per-topic breakdown like
BytesInPerSec or MessagesInPerSec. So I can't really tell which
topic/consumer is pounding the cluster.

Also the storm already ended. So I can't use tcpdump to capture live
traffic.

So any suggestion?


Re: how to identify rogue consumer

2014-10-08 Thread Steven Wu
Jun, you mean trace level logging for requestAppender?
log4j.logger.kafka.network.Processor=TRACE, requestAppender

if it happens again, I can try to enable it.

On Wed, Oct 8, 2014 at 9:54 PM, Jun Rao jun...@gmail.com wrote:

 If enabled request logging, you can find this out.

 Thanks,

 Jun

 On Wed, Oct 8, 2014 at 8:57 PM, Steven Wu stevenz...@gmail.com wrote:

  I have seen very high Fetch-Consumer-RequestsPerSec (like 15K) per
 broker
  in a relatively idle cluster. My hypothesis some misbehaving consumer
 has a
  tight polling loop without any back-off logic with empty fetch.
 
  Unfortunately, this metric doesn't have per-topic breakdown like
  BytesInPerSec or MessagesInPerSec. So I can't really tell which
  topic/consumer is pounding the cluster.
 
  Also the storm already ended. So I can't use tcpdump to capture live
  traffic.
 
  So any suggestion?
 



Re: BytesOutPerSec is more than BytesInPerSec.

2014-09-25 Thread Steven Wu
couldn't see your graph. but your replicator factor is 2. then replication
traffic can be the explanation. basically, BytesOut will be 2x of BytesIn.

On Thu, Sep 25, 2014 at 6:19 PM, ravi singh rrs120...@gmail.com wrote:

 I have set up my kafka broker with as single producer and consumer. When I
 am plotting the graph for all topic bytes in/out per sec  i could see that
 value of  BytesOutPerSec is more than BytesInPerSec.
 Is this correct? I confirmed that my consumer is consuming the messages
 only once. What could be the reason for this behavior?

 [image: Inline image 1]


 --
 *Regards,*
 *Ravi*



Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs

2014-09-10 Thread Steven Wu
partition [foo,2]'s state from New to Online since LeaderAndIsr path
already exists with value {leader:1,leader_epoch:0,isr:[1,0]} and
controller epoch 5
at
kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartition(PartitionStateMachine.scala:302)
at
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:183)
at
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:125)
at
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:124)
at
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
at
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:124)
at
kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:473)
at
kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:460)
at
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:422)
at
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:404)
at
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:404)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at
kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:403)
at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

[2014-09-03 21:18:41,168] ERROR Controller 5 epoch 5 initiated state change
of replica 1 for partition [foo,2] from NonExistentReplica to OnlineReplica
failed (state.change.logger)
java.lang.AssertionError: assertion failed: Replica
[Topic=foo,Partition=2,Replica=1] should be in the
NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states
before moving to OnlineReplica state. Instead it is in NonExistentReplica
state
at scala.Predef$.assert(Predef.scala:160)
at
kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:284)
at
kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:198)
at
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:96)
at
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:96)
at
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
at
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
at
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:96)
at
kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:474)
at
kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:460)
at
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:422)
at
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:404)
at
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:404)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at
kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:403)
at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)




On Tue, Sep 9, 2014 at 7:18 PM, Steven Wu stevenz...@gmail.com wrote:

 previous email is from state-change.log. also found this WARN in
 controller.log

 [2014-09-09 01:10:53,669] WARN [Controller 5]: Cannot remove replica 0
 from ISR of partition [cdn_selection_runtime_data,8] since it is not in the
 ISR. Leader = 1 ; ISR = List(1) (kafka.controller.KafkaController)



 On Tue, Sep 9, 2014 at 4:14 PM, Steven Wu stevenz...@gmail.com wrote:

 ah. maybe you mean the controller log on leader/controller broker 5. yes.
 I do noticed some errors regarding these two partitions.


 [2014-09-09 01:10:53,651] ERROR Controller 5 epoch 5 encountered error
 while changing partition [foo,2]'s state from New to Online since
 LeaderAndIsr path alrea
 dy exists with value {leader:0,leader_epoch:1,isr:[0,1]} and
 controller epoch 5 (state.change.logger)
 [2014-09-09 01:10:53,652] ERROR Controller 5 epoch 5 initiated state
 change for partition [foo,2] from NewPartition

Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs

2014-09-09 Thread Steven Wu
noticed one important thing. topic foo's partition 1 and 2 have empty .log
file on replicas. I suspect replication doesn't create the partition dir on
broker 0 in this case, which then cause the WARN logs.


On Mon, Sep 8, 2014 at 11:11 PM, Steven Wu stevenz...@gmail.com wrote:

 sorry. forgot to mention that I am running 0.8.1.1


 On Mon, Sep 8, 2014 at 9:26 PM, Steven Wu stevenz...@gmail.com wrote:

 did a push in cloud. after new instance for broker 0 comes up, I see a
 lot of WARNs in log file.

 2014-09-09 04:21:09,271 WARN  kafka.utils.Logging$class:83
 [request-expiration-task] [warn] [KafkaApi-0] Fetch request with
 correlation id 51893 from client 1409779957450-6014fc32-0-0 on partition
 [foo,1] failed due to Partition [foo,1] doesn't exist on 0
 2014-09-09 04:21:09,383 WARN  kafka.utils.Logging$class:83
 [kafka-request-handler-5] [warn] [KafkaApi-0] Fetch request with
 correlation id 51894 from client 1409779957450-6014fc32-0-0 on partition
 [foo,2] failed due to Partition [foo,2] doesn't exist on 0

 zookeeper shows it is the leader after this new instance come back.

 {controller_epoch:5,leader:0,version:1,leader_epoch:0,isr:[0,5]}
 {controller_epoch:5,leader:0,version:1,leader_epoch:1,isr:[0,1]}

 but we I check the data dirs. those two partitions aren't there.

 any idea?

 Thanks,
 Steven





Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs

2014-09-09 Thread Steven Wu
nope. sate-change log files only had some warnings regarding other
partitions. nothing related to these two partitions.

2014-09-09 02:54:30,579 WARN  kafka.utils.Logging$class:83
[kafka-request-handler-1] [warn] Broker 0 received invalid LeaderAndIsr
request with correlation id 497 from controller 5 epoch 5 with an older
leader epoch 8 for partition [bar,1], current leader epoch is 8


On Tue, Sep 9, 2014 at 3:37 PM, Jun Rao jun...@gmail.com wrote:

 Hmm, that seems to indicate that the leader info is not propagated properly
 from the controller to broker 0. In the state-change log of broker 0, do
 you see anything related to these 2 partitions when broker 0 comes up?

 Thanks,

 Jun

 On Tue, Sep 9, 2014 at 9:41 AM, Steven Wu stevenz...@gmail.com wrote:

  noticed one important thing. topic foo's partition 1 and 2 have empty
 .log
  file on replicas. I suspect replication doesn't create the partition dir
 on
  broker 0 in this case, which then cause the WARN logs.
 
 
  On Mon, Sep 8, 2014 at 11:11 PM, Steven Wu stevenz...@gmail.com wrote:
 
   sorry. forgot to mention that I am running 0.8.1.1
  
  
   On Mon, Sep 8, 2014 at 9:26 PM, Steven Wu stevenz...@gmail.com
 wrote:
  
   did a push in cloud. after new instance for broker 0 comes up, I see a
   lot of WARNs in log file.
  
   2014-09-09 04:21:09,271 WARN  kafka.utils.Logging$class:83
   [request-expiration-task] [warn] [KafkaApi-0] Fetch request with
   correlation id 51893 from client 1409779957450-6014fc32-0-0 on
 partition
   [foo,1] failed due to Partition [foo,1] doesn't exist on 0
   2014-09-09 04:21:09,383 WARN  kafka.utils.Logging$class:83
   [kafka-request-handler-5] [warn] [KafkaApi-0] Fetch request with
   correlation id 51894 from client 1409779957450-6014fc32-0-0 on
 partition
   [foo,2] failed due to Partition [foo,2] doesn't exist on 0
  
   zookeeper shows it is the leader after this new instance come back.
  
  
 
 {controller_epoch:5,leader:0,version:1,leader_epoch:0,isr:[0,5]}
  
 
 {controller_epoch:5,leader:0,version:1,leader_epoch:1,isr:[0,1]}
  
   but we I check the data dirs. those two partitions aren't there.
  
   any idea?
  
   Thanks,
   Steven
  
  
  
 



Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs

2014-09-09 Thread Steven Wu
assuming you are talking about controller log on broker 0, there is nothing
there.

$ less kafka-controller
2014-09-09 01:15:02,600 INFO  kafka.utils.Logging$class:68 [main] [info]
[ControllerEpochListener on 0]: Initialized controller epoch to 5 and zk
version 4
2014-09-09 01:15:02,655 INFO  kafka.utils.Logging$class:68 [main] [info]
[Controller 0]: Controller starting up
2014-09-09 01:15:02,692 INFO  kafka.utils.Logging$class:68 [main] [info]
[Controller 0]: Controller startup complete


On Tue, Sep 9, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote:

 In the controller log, do you see broker 0 being detected as the new broker
 when broker 0 comes up?

 Thanks,

 Jun

 On Tue, Sep 9, 2014 at 3:51 PM, Steven Wu stevenz...@gmail.com wrote:

  nope. sate-change log files only had some warnings regarding other
  partitions. nothing related to these two partitions.
 
  2014-09-09 02:54:30,579 WARN  kafka.utils.Logging$class:83
  [kafka-request-handler-1] [warn] Broker 0 received invalid LeaderAndIsr
  request with correlation id 497 from controller 5 epoch 5 with an older
  leader epoch 8 for partition [bar,1], current leader epoch is 8
 
 
  On Tue, Sep 9, 2014 at 3:37 PM, Jun Rao jun...@gmail.com wrote:
 
   Hmm, that seems to indicate that the leader info is not propagated
  properly
   from the controller to broker 0. In the state-change log of broker 0,
 do
   you see anything related to these 2 partitions when broker 0 comes up?
  
   Thanks,
  
   Jun
  
   On Tue, Sep 9, 2014 at 9:41 AM, Steven Wu stevenz...@gmail.com
 wrote:
  
noticed one important thing. topic foo's partition 1 and 2 have empty
   .log
file on replicas. I suspect replication doesn't create the partition
  dir
   on
broker 0 in this case, which then cause the WARN logs.
   
   
On Mon, Sep 8, 2014 at 11:11 PM, Steven Wu stevenz...@gmail.com
  wrote:
   
 sorry. forgot to mention that I am running 0.8.1.1


 On Mon, Sep 8, 2014 at 9:26 PM, Steven Wu stevenz...@gmail.com
   wrote:

 did a push in cloud. after new instance for broker 0 comes up, I
  see a
 lot of WARNs in log file.

 2014-09-09 04:21:09,271 WARN  kafka.utils.Logging$class:83
 [request-expiration-task] [warn] [KafkaApi-0] Fetch request with
 correlation id 51893 from client 1409779957450-6014fc32-0-0 on
   partition
 [foo,1] failed due to Partition [foo,1] doesn't exist on 0
 2014-09-09 04:21:09,383 WARN  kafka.utils.Logging$class:83
 [kafka-request-handler-5] [warn] [KafkaApi-0] Fetch request with
 correlation id 51894 from client 1409779957450-6014fc32-0-0 on
   partition
 [foo,2] failed due to Partition [foo,2] doesn't exist on 0

 zookeeper shows it is the leader after this new instance come
 back.


   
  
 
 {controller_epoch:5,leader:0,version:1,leader_epoch:0,isr:[0,5]}

   
  
 
 {controller_epoch:5,leader:0,version:1,leader_epoch:1,isr:[0,1]}

 but we I check the data dirs. those two partitions aren't there.

 any idea?

 Thanks,
 Steven



   
  
 



Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs

2014-09-09 Thread Steven Wu
ah. maybe you mean the controller log on leader/controller broker 5. yes. I
do noticed some errors regarding these two partitions.


[2014-09-09 01:10:53,651] ERROR Controller 5 epoch 5 encountered error
while changing partition [foo,2]'s state from New to Online since
LeaderAndIsr path alrea
dy exists with value {leader:0,leader_epoch:1,isr:[0,1]} and
controller epoch 5 (state.change.logger)
[2014-09-09 01:10:53,652] ERROR Controller 5 epoch 5 initiated state change
for partition [foo,2] from NewPartition to OnlinePartition failed
(state.change.logg
er)
kafka.common.StateChangeFailedException: encountered error while changing
partition [foo,2]'s state from New to Online since LeaderAndIsr path
already exists wi
th value {leader:0,leader_epoch:1,isr:[0,1]} and controller epoch 5
at
kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartition(PartitionStateMachine.scala:302)
at
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:183)
at
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
at
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
at
kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
at
kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327)
at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)


On Tue, Sep 9, 2014 at 4:08 PM, Steven Wu stevenz...@gmail.com wrote:

 assuming you are talking about controller log on broker 0, there is
 nothing there.

 $ less kafka-controller
 2014-09-09 01:15:02,600 INFO  kafka.utils.Logging$class:68 [main] [info]
 [ControllerEpochListener on 0]: Initialized controller epoch to 5 and zk
 version 4
 2014-09-09 01:15:02,655 INFO  kafka.utils.Logging$class:68 [main] [info]
 [Controller 0]: Controller starting up
 2014-09-09 01:15:02,692 INFO  kafka.utils.Logging$class:68 [main] [info]
 [Controller 0]: Controller startup complete


 On Tue, Sep 9, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote:

 In the controller log, do you see broker 0 being detected as the new
 broker
 when broker 0 comes up?

 Thanks,

 Jun

 On Tue, Sep 9, 2014 at 3:51 PM, Steven Wu stevenz...@gmail.com wrote:

  nope. sate-change log files only had some warnings regarding other
  partitions. nothing related to these two partitions.
 
  2014-09-09 02:54:30,579 WARN  kafka.utils.Logging$class:83
  [kafka-request-handler-1] [warn] Broker 0 received invalid LeaderAndIsr
  request with correlation id 497 from controller 5 epoch 5 with an older
  leader epoch 8 for partition [bar,1], current leader epoch is 8
 
 
  On Tue, Sep 9, 2014 at 3:37 PM, Jun Rao jun...@gmail.com wrote:
 
   Hmm, that seems to indicate that the leader info is not propagated
  properly
   from the controller to broker 0. In the state-change log

Re: zookeeper shows partition in-sync but local disk doesn't have the data dirs

2014-09-09 Thread Steven Wu
previous email is from state-change.log. also found this WARN in
controller.log

[2014-09-09 01:10:53,669] WARN [Controller 5]: Cannot remove replica 0 from
ISR of partition [cdn_selection_runtime_data,8] since it is not in the ISR.
Leader = 1 ; ISR = List(1) (kafka.controller.KafkaController)


On Tue, Sep 9, 2014 at 4:14 PM, Steven Wu stevenz...@gmail.com wrote:

 ah. maybe you mean the controller log on leader/controller broker 5. yes.
 I do noticed some errors regarding these two partitions.


 [2014-09-09 01:10:53,651] ERROR Controller 5 epoch 5 encountered error
 while changing partition [foo,2]'s state from New to Online since
 LeaderAndIsr path alrea
 dy exists with value {leader:0,leader_epoch:1,isr:[0,1]} and
 controller epoch 5 (state.change.logger)
 [2014-09-09 01:10:53,652] ERROR Controller 5 epoch 5 initiated state
 change for partition [foo,2] from NewPartition to OnlinePartition failed
 (state.change.logg
 er)
 kafka.common.StateChangeFailedException: encountered error while changing
 partition [foo,2]'s state from New to Online since LeaderAndIsr path
 already exists wi
 th value {leader:0,leader_epoch:1,isr:[0,1]} and controller epoch 5
 at
 kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartition(PartitionStateMachine.scala:302)
 at
 kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:183)
 at
 kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
 at
 kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
 at scala.collection.Iterator$class.foreach(Iterator.scala:772)
 at
 scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
 at
 kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
 at
 kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433)
 at
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344)
 at
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
 at
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329)
 at
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
 at
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at
 kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327)
 at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)


 On Tue, Sep 9, 2014 at 4:08 PM, Steven Wu stevenz...@gmail.com wrote:

 assuming you are talking about controller log on broker 0, there is
 nothing there.

 $ less kafka-controller
 2014-09-09 01:15:02,600 INFO  kafka.utils.Logging$class:68 [main] [info]
 [ControllerEpochListener on 0]: Initialized controller epoch to 5 and zk
 version 4
 2014-09-09 01:15:02,655 INFO  kafka.utils.Logging$class:68 [main] [info]
 [Controller 0]: Controller starting up
 2014-09-09 01:15:02,692 INFO  kafka.utils.Logging$class:68 [main] [info]
 [Controller 0]: Controller startup complete


 On Tue, Sep 9, 2014 at 4:06 PM, Jun Rao jun...@gmail.com wrote:

 In the controller log, do you see broker 0 being detected as the new
 broker
 when broker 0 comes up?

 Thanks,

 Jun

 On Tue, Sep 9, 2014 at 3:51 PM, Steven Wu stevenz...@gmail.com wrote:

  nope. sate-change log files only had some warnings regarding other
  partitions. nothing related to these two partitions.
 
  2014-09-09 02:54:30,579 WARN  kafka.utils.Logging$class:83

new broker instance can't sync up one partition from peers

2014-09-08 Thread Steven Wu
I did a push. new instance comes up and tries to fetch log/data from other
peers/replicas. Out of 60 partitions assigned for broker 0, it sync'ed up
59. but for whatever reason, it didn't try to fetch this partition/topic.
[out-of-sync replica] BrokerId: 0, Topic: foo, PartitionId: 6, Leader: 5,
Replicas: 5,0, Isr: 5

I checked the partition data size on broker 5. it is actually very small.
$ du -hs /mnt/data/kafka/foo-6/
200K/mnt/data/kafka/foo-6/

checked log file on broker 0 and 5. couldn't find anything relevant to this
issue. found a different issue that I will ask in a separate thread.

Any suggestion?

Thanks,
Steven


zookeeper shows partition in-sync but local disk doesn't have the data dirs

2014-09-08 Thread Steven Wu
did a push in cloud. after new instance for broker 0 comes up, I see a lot
of WARNs in log file.

2014-09-09 04:21:09,271 WARN  kafka.utils.Logging$class:83
[request-expiration-task] [warn] [KafkaApi-0] Fetch request with
correlation id 51893 from client 1409779957450-6014fc32-0-0 on partition
[foo,1] failed due to Partition [foo,1] doesn't exist on 0
2014-09-09 04:21:09,383 WARN  kafka.utils.Logging$class:83
[kafka-request-handler-5] [warn] [KafkaApi-0] Fetch request with
correlation id 51894 from client 1409779957450-6014fc32-0-0 on partition
[foo,2] failed due to Partition [foo,2] doesn't exist on 0

zookeeper shows it is the leader after this new instance come back.

{controller_epoch:5,leader:0,version:1,leader_epoch:0,isr:[0,5]}
{controller_epoch:5,leader:0,version:1,leader_epoch:1,isr:[0,1]}

but we I check the data dirs. those two partitions aren't there.

any idea?

Thanks,
Steven


Re: undesirable log retention behavior

2014-07-31 Thread Steven Wu
log.retention.bytes can somewhat help. but it is cumbersome to use because
it is a per-topic config for partition limit.

there was an earlier thread regarding global bytes limit. that will work
well for my purpose of avoiding disk full.
https://issues.apache.org/jira/browse/KAFKA-1489


On Thu, Jul 31, 2014 at 7:39 PM, Joe Stein joe.st...@stealth.ly wrote:

 What version of Kafka are your using? Have you tried log.retention.bytes?
 Which ever comes first (ttl or bytes total) should do what you are looking
 for if I understand you right.
 http://kafka.apache.org/documentation.html#brokerconfigs

 /***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
 /
 On Jul 31, 2014 6:52 PM, Steven Wu steve...@netflix.com.invalid wrote:

  it seems that log retention is purely based on last touch/modified
  timestamp. This is undesirable for code push in aws/cloud.
 
  e.g. let's say retention window is 24 hours. disk size is 1 TB. disk util
  is 60% (600GB). when new instance comes up, it will fetch log files
 (600GB)
  from peers. those log files all have newer timestamps. they won't be
 purged
  until 24 hours later. note that during the first 24 hours, new msgs
  (another 600GB) continue to come in. This can cause disk full problem
  without any intervention. With this behavior, we have to keep disk util
  under 50%.
 
  can last modified timestamp be inserted into the file name when rolling
  over log files? then kafka can check the file name for timestamp. does
 this
  make sense?
 
  Thanks,
  Steven
 



log retention and rollover

2014-06-02 Thread Steven Wu
This might be a bit unusual. We have a topic that we only need to keep last
5 minutes of msgs so that replay from beginning is fast.

Although retention.ms has time unit of minute, segment.ms ONLY has time
unit of hour. If I understand cleanup correctly, it can only delete files
that are rolled over. If true, the minimal retention period can be actually
one hour.

is there any particular reason for different time units for retention and
roll? Can we add log.roll.minutes?

retention.ms7 dayslog.retention.minutesThis configuration controls the
maximum time we will retain a log before we will discard old log segments
to free up space if we are using the delete retention policy. This
represents an SLA on how soon consumers must read their data.

segment.ms7 dayslog.roll.hoursThis configuration controls the period of
time after which Kafka will force the log to roll even if the segment file
isn't full to ensure that retention can delete or compact old data.

Thanks,
Steven


Re: log retention and rollover

2014-06-02 Thread Steven Wu
created KAFKA-1480 https://issues.apache.org/jira/browse/KAFKA-1480.
Thanks!

it's also generally better to have consistent/matching time unit for these
two configs.


On Mon, Jun 2, 2014 at 4:22 PM, Guozhang Wang wangg...@gmail.com wrote:

 Steven,

 We initially set the rolling criterion based on hours to avoid too frequent
 log rolling and in turn too small segment files. For your case this may be
 reasonable to set the rolling criterion on minutes. Could you file a JIRA?

 Guozhang


 On Mon, Jun 2, 2014 at 4:00 PM, Steven Wu steve...@netflix.com.invalid
 wrote:

  This might be a bit unusual. We have a topic that we only need to keep
 last
  5 minutes of msgs so that replay from beginning is fast.
 
  Although retention.ms has time unit of minute, segment.ms ONLY has time
  unit of hour. If I understand cleanup correctly, it can only delete files
  that are rolled over. If true, the minimal retention period can be
 actually
  one hour.
 
  is there any particular reason for different time units for retention and
  roll? Can we add log.roll.minutes?
 
  retention.ms7 dayslog.retention.minutesThis configuration controls the
  maximum time we will retain a log before we will discard old log segments
  to free up space if we are using the delete retention policy. This
  represents an SLA on how soon consumers must read their data.
 
  segment.ms7 dayslog.roll.hoursThis configuration controls the period of
  time after which Kafka will force the log to roll even if the segment
 file
  isn't full to ensure that retention can delete or compact old data.
 
  Thanks,
  Steven
 



 --
 -- Guozhang



Re: kafka broker failed to recovery from ZK failure

2014-05-26 Thread Steven Wu
well. I just run the kafka-run-class.sh script with DeleteTopicCommand
class directly.

I noticed the warning on documentation http://DeleteTopicCommand page.
but I thought it is only about not complete cleanup. I didn't know it can
affect cluster health. maybe it's worthwhile to point it out the actual
danger.


On Thu, May 22, 2014 at 8:49 PM, Jun Rao jun...@gmail.com wrote:

 Delete topic is not supported in 0.8.1.1. How did you do it?

 Thanks,

 Jun


 On Thu, May 22, 2014 at 9:59 AM, Steven Wu steve...@netflix.com wrote:

  yes. I deleted a topic. but not at the time. it's a few hours before.
 
 
  On Fri, May 16, 2014 at 3:49 PM, Jun Rao jun...@gmail.com wrote:
 
   The problem is indicated by the following log in broker 1's controller
  log.
   Were you deleting any topic at that time?
  
   [2014-05-12 21:24:37,930] ERROR [BrokerChangeListener on Controller 1]:
   Error while handling broker changes
   (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
   java.util.NoSuchElementException: key not found:
   [ocp.fillhealth.us-east-1,0]
   at scala.collection.MapLike$class.default(MapLike.scala:225)
   at scala.collection.mutable.HashMap.default(HashMap.scala:45)
   at scala.collection.MapLike$class.apply(MapLike.scala:135)
   at scala.collection.mutable.HashMap.apply(HashMap.scala:45)
   at
  
  
 
 kafka.controller.ControllerBrokerRequestBatch.updateMetadataRequestMapFor$1(ControllerChannelManager.scala:242)
   at
  
  
 
 kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268)
   at
  
  
 
 kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268)
   at
  
  
 
 scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99)
   at
  
  
 
 scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99)
   at scala.collection.Iterator$class.foreach(Iterator.scala:772)
   at
   scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
   at
  
 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
   at
  scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
   at
   scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:99)
   at
  
  
 
 kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:268)
   at
  
  
 
 kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:940)
   at
  
 
 kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:386)
   at
  
  
 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:342)
   at
  
  
 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
   at
  
  
 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at
  
  
 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329)
   at
  
  
 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
   at
  
  
 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
   at kafka.utils.Utils$.inLock(Utils.scala:538)
   at
  
  
 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327)
   at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
  
   Thanks,
  
   Jun
  
  
  
   On Tue, May 13, 2014 at 9:50 AM, Steven Wu steve...@netflix.com
 wrote:
  
sorry for the wrong log file. please see the attached zip file for
 all
  4
log files.
   
   
On Mon, May 12, 2014 at 9:05 PM, Jun Rao jun...@gmail.com wrote:
   
The controller log in broker 1 is too late. Could you send its log
around 2014-05-12 21:24:37?
   
Thanks,
   
Jun
   
   
On Mon, May 12, 2014 at 5:02 PM, Steven Wu steve...@netflix.com
   wrote:
   
 This is a three-node cluster. broker 0 lost connection to ZK.
  broker 1
 does seem to take the controller role. but broker 0 stuck in the
 bad
state
 and wasn't able to recover.

 it seems to start with these error msgs. I have attached complete
 controller and server log for broker 0 and 1.

 I am

Re: kafka broker failed to recovery from ZK failure

2014-05-22 Thread Steven Wu
yes. I deleted a topic. but not at the time. it's a few hours before.


On Fri, May 16, 2014 at 3:49 PM, Jun Rao jun...@gmail.com wrote:

 The problem is indicated by the following log in broker 1's controller log.
 Were you deleting any topic at that time?

 [2014-05-12 21:24:37,930] ERROR [BrokerChangeListener on Controller 1]:
 Error while handling broker changes
 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
 java.util.NoSuchElementException: key not found:
 [ocp.fillhealth.us-east-1,0]
 at scala.collection.MapLike$class.default(MapLike.scala:225)
 at scala.collection.mutable.HashMap.default(HashMap.scala:45)
 at scala.collection.MapLike$class.apply(MapLike.scala:135)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:45)
 at

 kafka.controller.ControllerBrokerRequestBatch.updateMetadataRequestMapFor$1(ControllerChannelManager.scala:242)
 at

 kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268)
 at

 kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268)
 at

 scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99)
 at

 scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99)
 at scala.collection.Iterator$class.foreach(Iterator.scala:772)
 at
 scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
 at
 scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:99)
 at

 kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:268)
 at

 kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:940)
 at
 kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:386)
 at

 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:342)
 at

 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
 at

 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at

 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329)
 at

 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
 at

 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at

 kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327)
 at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

 Thanks,

 Jun



 On Tue, May 13, 2014 at 9:50 AM, Steven Wu steve...@netflix.com wrote:

  sorry for the wrong log file. please see the attached zip file for all 4
  log files.
 
 
  On Mon, May 12, 2014 at 9:05 PM, Jun Rao jun...@gmail.com wrote:
 
  The controller log in broker 1 is too late. Could you send its log
  around 2014-05-12 21:24:37?
 
  Thanks,
 
  Jun
 
 
  On Mon, May 12, 2014 at 5:02 PM, Steven Wu steve...@netflix.com
 wrote:
 
   This is a three-node cluster. broker 0 lost connection to ZK. broker 1
   does seem to take the controller role. but broker 0 stuck in the bad
  state
   and wasn't able to recover.
  
   it seems to start with these error msgs. I have attached complete
   controller and server log for broker 0 and 1.
  
   I am using kafka_2.9.2-0.8.1.1.jar.
  
   Thanks,
   Steven
  
   [2014-05-12 21:24:28,737] INFO Client session timed out, have not
 heard
   from server in 4000ms for sessionid 0xb145a9585a806013, closing socket
   connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
   [2014-05-12 21:24:28,838] INFO zookeeper state changed (Disconnected)
   (org.I0Itec.zkclient.ZkClient)
   [2014-05-12 21:24:29,360] INFO Opening socket connection to server
   ip-10-84-58-49.ec2.internal/10.84.58.49:2181. Will not attempt to
   authenticate using SASL (unknown error)
  (org.apache.zookeeper.ClientCnxn)
   [2014-05-12 21:24:30,562] INFO Client session timed out, have not
 heard
   from server in 1724ms for sessionid 0xb145a9585a806013, closing socket
   connection and attempting reconnect

Re: question about mirror maker

2014-05-12 Thread Steven Wu
if placing mirror maker in the same datacenter as target cluster,
it/consumer will talks to zookeeper in remote/source datacenter. would it
more susceptible to network problems?

As for the problem commit offset without actually producing/writing msgs to
target cluster, it can be solved by disabling auto-commit. and only commit
msgs that are actually persisted in target cluster.

what do you think  of this opposite approach?


On Sun, May 11, 2014 at 8:48 PM, Todd Palino tpal...@linkedin.com wrote:

 Yes, on both counts. Putting the mirror maker in the same datacenter in
 the target cluster is exactly what we do as well. We also monitor both the
 consumer lag (by comparing the offsets stored in Zookeeper and the tail
 offset on the brokers), and the number of dropped and failed messages on
 the mirror maker producer side. The other thing to do is to make sure to
 check very carefully when you are changing anything about the producer
 configuration, to assure that you have not made a mistake.

 -Todd

 On 5/11/14, 9:12 AM, Weide Zhang weo...@gmail.com wrote:

 Hi Todd,
 
 Thanks for your answer. with regard to fail over for mirror maker, does
 that mean if i have 4 mirror maker running in different machines with same
 consumer group, it will auto load balance if one of the mirror maker fails
 ? Also, it looks to prevent mirror maker commit wrong (consumer work but
 not producer) due to cross data center network issue, mirror maker need to
 be placed along with the target cluster so that this scenario is minimized
 ?
 
 
 On Sat, May 10, 2014 at 11:39 PM, Todd Palino tpal...@linkedin.com
 wrote:
 
  Well, if you have a cluster in each datacenter, all with the same
 topics,
  you can¹t just mirror the messages between them, as you will create a
  loop. The way we do it is to have a ³local² cluster and an ³aggregate²
  cluster. The local cluster has the data for only that datacenter. Then
 we
  run mirror makers that copy the messages from each of the local clusters
  into the aggregate cluster. Everything produces into the local clusters,
  and nothing produces into the aggregate clusters. In general, consumers
  consume from the aggregate cluster (unless they specifically want only
  local data).
 
  The mirror maker is as fault tolerant as any other consumer. That is,
 if a
  mirror maker goes down, the others configured with the same consumer
 group
  (we generally run at least 4 for any mirror maker, sometimes up to 10)
  will rebalance and start back up from the last committed offset. What
 you
  need to watch out for is if the mirror maker is unable to produce
  messages, for example, if the network goes down. If it can still consume
  messages, but cannot produce them, you will lose messages as the
 consumer
  will continue to commit offsets with no knowledge that the producer is
  failing.
 
  -Todd
 
  On 5/8/14, 11:20 AM, Weide Zhang weo...@gmail.com wrote:
 
  Hi,
  
  I have a question about mirror maker. say I have 3 data centers each
  producing topic 'A' with separate kafka cluster running. if 3 of the
 data
  need to be kept in sync with each other, shall i create 3 mirror maker
 in
  each data center to get the data from the other two ?
  
  also, it mentioned that mirror making is not fault tolerant ? so what
 will
  be the behavior of mirror consumer if it went down due to network and
 back
  up ? do they catch up with last offset from which they last mirror ? If
  so,
  is it enabled by default or I have to configure  ?
  
  Thanks a lot,
  
  Weide